akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Expose ConsumerRebalanceListener or something similar so one can handle partition assignment changes #93

Closed vishnusaran closed 8 years ago

vishnusaran commented 8 years ago

Hi,

Kafka exposes ConsumerRebalanceListener to handle scenarios where application needs to know when the partition assignment changes. In my scenario, i am aggregating data from Kafka and every time a partition assignment changes, i have reset the aggregation. Is there way to expose the ConsumerRebalanceListener ReactiveKafka.consumer API?

Thanks!

kciesielski commented 8 years ago

Hi, can you point me to some example of how a ConsumerRebalanceListener can be used with pure Kafka Client API? I'm not sure how we can fit this concept into reactive streams API.

13h3r commented 8 years ago

From my point of view we can add additional Out for consumer which will emit information about rebalancing. So we will have such Outs:

vishnusaran commented 8 years ago

In my scenario, I am building an in-memory aggregated cache which aggregates all the data in kafka. I am not committing any offsets, so when a rebalance is triggered, the consumer will start consuming from the smallest offset (auto.offset.reset = smallest) and rebuild the cache. If i don't know if a rebalance is triggered, then I won't be able to reset the cache and the cache will have invalid information.

13h3r commented 8 years ago

@vishnusaran do you want to completely reread all data from associated partitions or only from newly associated? What if some partition will be removed from client?

vishnusaran commented 8 years ago

Since i do not commit the offsets, every time a rebalance triggers, kafka will start sending data for all partitions starting from smallest offset (auto.offset.reset = smallest). So in this scenario, I will have to reset the cache, meaning i have reread all the data into the cache for all the partitions that i am currently assigned to. ConsumerRebalanceListener has hooks to partition getting removed and getting added. In my scenario, I just need to know that partition reassignment happened, so I can reset the cache.

If we add additional out, we can send the previous partition assignment and current assignment when a rebalance is triggered.

13h3r commented 8 years ago

Right now we do not have any way to reset an offset of a running graph. We have only commit In. Correct solution IMO will be to restart a graph instance. But the graph instance owns instance of kafka client and we want to keep it. Right now I have no idea how to implement it in stream manner.

Also we can provide materialized kafka consumer to user and you can setup your handler. But you also need to push information about rebalancing to your graph to invalidate current cache. Correct?

I do not have all details of your problem. Do you have an idea which API will allow you to solve your problem? We can discuss and adopt it somehow.

vishnusaran commented 8 years ago

Initially my idea was, if the ReactiveKafka.consume API allows one to plugin their own consumerRebalanceListenener, then i would shutdown the original graph and materialize a new graph for every partition rebalance (using akka FSM to manage the lifecycle of the running graph.)

Then @13h3r pointed out, that we can have multiple multiple outs in which case i can wire in the rebalance event into my custom cache shape and reset my cache when any event arrives in that port.

If the materialized kakfa consumer is exposed, then i can call subscribe with the consumerRebalanceListenener and do the same graph management using Akka FSM as mentioned before.

At this moment, I don't know which is the right solution, or if any other solution exists..

13h3r commented 8 years ago

The API is proposed here - #133

13h3r commented 8 years ago

What we need to have

Automatic partitions assignment:

Manual partition assignment: We need to be able to manually assign/revoke partition from existing source

13h3r commented 8 years ago

done