segmentio / kafka-go

Kafka library in Go
MIT License
7.57k stars 786 forks source link

Need a way to detect rebalancing event #1023

Open abonec opened 1 year ago

abonec commented 1 year ago

Need any way to detect that rebalancing happening. It may be in any way:

One way you suggest solving this issue is to use ConsumerGroup api instead of Reader api. But the point of using Reader api is simplicity for a programmer and moving torwards ConsumerGroup api only just to observe rebalancing events is too high price in regular cases that lead to unnecessary code and errors.

In case you get interesting in any of that solution, I can start PR.

dominicbarnes commented 1 year ago

I presume you want more than logging, as I believe we do log a message on each rebalance. https://github.com/segmentio/kafka-go/blob/v0.4.38/reader.go#L140-L142

What's the use-case you're trying to solve? Depending on what you're doing could inform a lot of what we would proceed with.

abonec commented 1 year ago

@dominicbarnes I don't search for logging. I want to have ability to detect rebalancing at some point and clean workers that were assigned to some partitions. I know that I can do it by myself by implementing a custom consumer group (this is precisely what we have done) but this requires a lot of boilerplate code that is already implemented in your Reader. If there would just ability to detect rebalancing, we can throw this code out and will use standard Reader.

dominicbarnes commented 1 year ago

I'm afraid I don't fully understand. As you mentioned, the Reader already does rebalancing, so the Reader instances for your application (within the same process or across other processes) should be managed by the brokers of the Kafka cluster and you shouldn't need to implement any cleanup logic yourself.

Is there something more broadly useful about the consumer group assignments that is needed outside of your kafka consumers themselves? Or is there something that Reader is not handling correctly that you need this extra hook for?

dominicbarnes commented 1 year ago

I think the big concern I have is that exposing details about the inner-workings of ConsumerGroup is unnecessary if Reader is performing it's job correctly. Thus, in my mind this sounds like an attempt to work around some sort of problem, which I'd prefer to try and solve at the root rather than expose hacks and workarounds. That being said, I may just be misunderstanding what the ask is.

louisboilard commented 1 year ago

@dominicbarnes I believe @abonec wants a way to know when a consumer rebalancing is about to happen. It's an essential feature considering there is no support for static membership of consumer group. Are there currently any way to make sure a consumer X gets reassigned the same partitions after a rebalance? Or at least a way to know when a rebalance is about to happen to manage things ourselves?

abonec commented 1 year ago

@dominicbarnes we partition our workload across different partitions as all events from one business client go to specific partition. And during of processing this partition on the reader side we cache some data to optimize processing. Everything works fine except of when we have back and forth (to the same worker) rebalancing:

  1. Worker 1 process client 1 with actual cache ... rebalancing to Worker 2
  2. Worker 2 process client 1 with brand new actual cache loaded from the database. At the same time Worker 1 have an outdated cache, but it is not a problem at that moment because it is not used for now ... rebalancing back to Worker 1
  3. Worker 1 process client 1 with an outdated cache from step 1

If we had an event between these rebalancings, we would have an opportunity to clear caches

abonec commented 1 year ago

@louisboilard is half right, I want to know when rebalancing is about to happen to clear our caches, but I don't need a static membership. Instead, we want dynamic membership to have an ability to spread workload across different pods.

cardinalby commented 1 year ago

@abonec I have the same need (I try to track commit intents to commit lower offset only once higher are ready).

As I figured out, in the current state of the library you can try the following options:

  1. periodically observe Rebalances from Stats()
  2. create a logger wrapper that reacts on "subscribed to topics and partitions" message
  3. create a wrapper for GroupBalancer that is called on rebalances (you can pass it to ReaderConfig.GroupBalancers)