zio / zio-redis

A ZIO-based redis client
https://zio.github.io/zio-redis
Apache License 2.0
123 stars 63 forks source link

better async/pipelining support? #916

Closed mberndt123 closed 9 months ago

mberndt123 commented 10 months ago

Hey @mijicd,

I think I have run into a limitation of the zio-redis architecture that makes it hard for me to solve a quite straight-forward problem.

When you run a command, zio-redis will create a Promise for the response and place it into the requests Queue along with the command. Then there are some fibers that take care of sending that command to Redis and completing the Promise as soon as the response comes back from Redis. I think this is a great design because it enables pipelining without the user having to worry about it explicitly.

However it has one drawback: when you want to send several commands to the server that need to run in a specific order, you have to wait for each command to finish before sending the next one. This is because the return type of all the command methods is something like IO[RedisError, A]. What would be needed is something like UIO[IO[RedisError, A]], where the outer UIO finishes as soon as the command was placed in the requests Queue, and the inner IO then waits for the response from the server.

I'm not proposing to change all the method return types because that would be a very invasive change and everybody would have to call flatten all the time. Instead, my suggestion would be to add a G[_] type parameter to the Redis trait so that it's possible to have both a "synchronous" (return type IO[RedisError, A]) and an "asynchronous" (return type UIO[IO[RedisError, A]]) variant of the Redis trait without having to duplicate all the methods. I'd be glad to work on the implementation of such a design.

For motivation, I'd like to explain how I ran into this problem.

I'm reading messages (key-value pairs) from a Kafka topic, and I want to store these messages in Redis so that I can always retrieve the latest value for any given key. A naïve solution would be something like this:

Consumer.plainStream(…stuff…)
  .mapZIO(record => serviceWithZIO[Redis](_.hSet(record.key, "foo" -> record.value.foo)).as(record))
  .mapZIO(_.offset.commit)
}

This works, but performance is bad when you do it this way because it will wait for each hSet command to complete before sending the next one. But the question is, what to do instead? If I replace the first mapZIO with mapZIOPar, then the program no longer works correctly. When two messages with the same key arrive within a short time window, there is no guarantee which one will end up being sent to Redis first, and so the value of an older message might overwrite the value from a newer one. I also considered mapZIOParByKey, but this is also not a good solution (poor performance if the number of distinct keys is small, and it reorders messages, making it hard to commit the Kafka offsets).