akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 645 forks source link

Redis connector #1111

Open SandishKumarHN opened 6 years ago

SandishKumarHN commented 6 years ago

I would like to add a Redis connector to alpakka

2m commented 6 years ago

Great! Do you have any specific usecase in mind, that would greatly benefit from the streaming approach? Also, do you have any particular Redis client library in mind? AFAIK there are a couple of different async Scala and Java ones.

SandishKumarHN commented 6 years ago

The use case would something like this "Taking advantage of Redis in memory storage engine to do list and set operations makes it an amazing platform to use for a message queue." I'm thinking of using this https://github.com/lettuce-io/lettuce-core Redis Scala async client

riyadparvez commented 6 years ago

@SandishKumarHN Any update on this issue?

SandishKumarHN commented 6 years ago

@riyadparvez working on it will create a PR by Dec 1st week

aisven commented 5 years ago

Very interesting Issue! @SandishKumarHN Do you have something? I see some work is also underway and pending for reviews in https://github.com/akka/alpakka/pull/1350

SandishKumarHN commented 5 years ago

I have the similar source code, I think we should consider this one #1350 itself.

hveiga commented 5 years ago

Is there any progress on this? If not, I am willing to put some time on it to get it ready for a PR.

ennru commented 5 years ago

No, we haven't seen any activity since the review of https://github.com/akka/alpakka/pull/1350

choffmeister commented 4 years ago

Since none of the existing redis client implementations did what I wanted (I wanted to to be based on akka and to super TLS), I started my own implementation, purely based on akka streams (no custom actors and stuff).

https://github.com/choffmeister/microservice-utils/blob/b0bfb15fa965952dc7c6bbc7633f2b424e8cd199/microservice-utils-rate-limiter/src/main/scala/de/choffmeister/microserviceutils/ratelimiter/RedisClient.scala

The parsing is still incomplete (no support for parsing complex string replies), but does what I need for now (needed it for rate limiting).

ennru commented 4 years ago

@choffmeister Thanks for sharing!

choffmeister commented 4 years ago

@ennru I finished implementing the redis reply parsing and vastly improved the test coverage. In this state this might be a candidate for alpakka. All other implementations I saw heavily used custom written actors for state management. This implementation is way simpler.

choffmeister commented 4 years ago

Now I am very happy with my implementation. I also added support for redis pub/sub. Latest version here: https://github.com/choffmeister/microservice-utils/tree/master/microservice-utils-redis/src/main/scala/de/choffmeister/microserviceutils/redis

Even more improvements to my implementation. Here some key facts:

  1. Fundamental implementation with streams that handles command/reply roundtrips and subscriptions
  2. Redis.commandReplyFlow(...) returns a Flow[Command, Replies, Future[Tcp.OutgoingConnection]]
    val commands: Source[Command, NotUsed] =
      Source(List(Command.Ping(), Command.Ping))
    val replies: Future[Seq[Replies]] =
      commands.via(Redis.commandReplyFlow(...)).runWith(Sink.seq)
  3. Redis.subscriptionSource(...) returns a Source[(String, ByteString), Future[Tcp.OutgoingConnection]]
    val messages: Source[(String, ByteString), _] =
      Redis.subscriptionSource(..., "first-channel", "second-channel")
    messages.runWith(Sink.foreach { case (channel, message) =>
      println(s"[$channel] ${message.utf8String}")
    })
  4. An additional stateful client build as an actor
    val client: RedisClient = new RedisClient(...)
    for {
      _ <- client.command(Command.Ping())
      // Replies(Reply.SimpleString("PONG"))
      _ <- client.command(Command.Pipeline(
        Command.Multi,
        Command.Incr(key),
        Command.Expire(key, 10),
        Command.Exec
      ))
      // Replies(
      //   Reply.SimpleString("OK"),
      //   Reply.SimpleString("QUEUED"),
      //   Reply.SimpleString("QUEUED"),
      //   Reply.Array(Reply.Integer(1L), Reply.Integer(0L))    
      // )
      done <- client.disconnect()
      // Done
    } yield done
  5. Support for TLS (most managed cloud redis instances use TLS)
  6. Already a pretty good test coverage including integration tests against a running redis
  7. Not every command has a corresponding case class yet, but one can always fall back to send Command.Raw("CUSTOM", ByteString("arg1"))

Let me know, if this is API-wise somethat, you would be willing to accept as PR to alpakka.

ennru commented 4 years ago

Sorry for not replying earlier. Thank you for suggesting this for Alpakka! For the time being, we have very limited capacity to review larger PRs.