fd4s / fs2-kafka

Functional Kafka Streams for Scala
https://fd4s.github.io/fs2-kafka
Apache License 2.0
295 stars 100 forks source link

Multiple consumers read from the same partition after rebalance #1200

Open squadgazzz opened 1 year ago

squadgazzz commented 1 year ago

With fs2-kafka 3.0.1 and also on 2.x.x, we faced with the issue where multiple consumers that share the same group id start reading from the same partition after rebalance. The flow looks as follows:

We have already checked the broker and found no issue. Especially the logs confirm that the broker issues the right events. There might be something with the ConsumerRebalanceListener implementation in the library.

squadgazzz commented 1 year ago

Might be related to the https://github.com/fd4s/fs2-kafka/issues/127

wookievx commented 2 months ago

We had very similar issue at my workplace that old consumer kept consuming after being revoked. From what I gathered from the linked issue and rebalance listener implementation all the streams consuming partitions that are revoked are immediately signalled to terminate. Maybe issue is due to how termination is handled in fs2.Streams/cats effect, or certain stream combinators that mess with it. I was unable to confirm that blocking the callback onRevoked actually stops other consumers from processing partition from older offset. I will probably perform experiment to check if that is the case.

This property of always being guaranteed that only one consumer is consuming a partition is very important to my use case (enabled me to store state in memory and avoid conflicts, very useful for event sourcing). In my use case I do not really care if message will be processed multiple times, as long as only one consumer is doing it at a time.

One thing ZIO kafka does differently I think is that it stores promise notifying the "actor" that partition stream finished operating, that might be a change needed here to allow the semantics described above (but it all hangs on the assumption that new assignment is blocked by revoke callback)