silviucpp / erlkaf

Erlang kafka driver based on librdkafka
MIT License
84 stars 41 forks source link

[WIP] pasue and resume consumer #35

Open JackMF opened 3 years ago

JackMF commented 3 years ago

urning the consumer into gen_statem with polling and paused state. TODO update consumer_group and manager to hande calls.

silviucpp commented 3 years ago

I honestly believe this approach is wrong. Better is to use the librdkafka api for pausing/resuming consumers. Just to give you an example with the implementation you did in case a rebalancing takes place the existing instance of erlkaf_consumer is destroyed and another one is created when re-assignment is completed. Your state will gone in this case.

If you use the librdkafka api this use case will be covered. https://github.com/edenhill/librdkafka/issues/1849

silviucpp commented 3 years ago

I'm talking about the api's https://github.com/edenhill/librdkafka/blob/58aa3ab832172770b1b690a06900df9c297b135c/src/rdkafka.h#L3029

JackMF commented 3 years ago

I understand that the partition would not be paused after a rebalance. However, the use case I was imagining would not want it to remain paused after a rebalance. Perhaps I should implement this at application level as you say. I just wanted the neatness of being able to call erlkaf:pause_conusmer(Topic, Partition) .

A quick question if I called, erkaf_consumer:stop(Pid), just for one partition of a topic - would this trigger a rebalance? Or would this have a similar effect as this "pasue" I am imagining above. If so could I create an expose a erlkaf:stop_consumer(Topic, Partition) which would call erkaf_consumer:stop(Pid) behind the scenes?