haskell-works / hw-kafka-client

Kafka client for Haskell, including auto-rebalancing consumers
MIT License
140 stars 50 forks source link

Do not allow batch poll and rebalance to happen in parallel #176

Closed michaelglass closed 2 years ago

michaelglass commented 3 years ago

Our understanding

The rebalance callback happens in two threads:

is that right? if not this PR makes no sense!

This ensures that the async callback waits for a poll to complete before calling.

Problem

We were seeing poll responses with messages from partitions that had been recently revoked. This PR helped (minimize but not completely resolve) those cases.

Some context

If we allow a batch poll and rebalance event to run in parallel then there is no way to tell if a poll that starts before a rebalance and returns after the rebalance completes returns messages from the old assigned partition set or the new one. The hw-kafka-client user will then not know whether they should disregard the returned message set or process it.

The solution

This makes it so the batch message polling runs in the same lock the consumer event callback does. That way we can be sure a poll for messages and a rebalance event will never be running in parallel.

michaelglass commented 3 years ago

should this PR be against master or main?

AlexeyRaga commented 3 years ago

@michaelglass main, everything is main now. Master has retired and no more masters in this brave new world...

AlexeyRaga commented 3 years ago

I will brush my memory on how rebalance is happening actually happening and will look into this hopefully soon.

But, just for clarification, why would it matter for the client whether it sees messages from already unassigned partitions? They'd be processed, perhaps even twice (by this instance and by the new one) so we don't seem to be losing messages?

ghost commented 3 years ago

Thank you for taking a look!

They'd be processed, perhaps even twice (by this instance and by the new one) so we don't seem to be losing messages?

Yep, agreed, we're not too worried about losing messages.

For our particular use case processing a message twice would be fine, but processing a series of messages for a particular partition twice would be a problem.

I.e., we'd be fine with this sequence of messages: A B B C C D But not with this one: A B C B C D

We believe that by committing after every offset we should be able to get this guarantee from Kafka, but only if we can be sure we will stop processing messages for a partition when we loose it as an assignment.

robinp commented 3 years ago

Kafka client only triggers callbacks when one of the poll-like methods are invoked.

These callbacks will [in addition to rd_kafka_poll] also be triggered by rd_kafka_flush(), rd_kafka_consumer_poll(), and any other functions that serve queues. (from the excellent https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md)

For example, whe using the sync api, rd_kafka_consumer_poll will be called, and the callback trigger at that time. The async api also schedules backgroud rd_kafka_poll-s, which will can also trigger the rebalance callback - probably this is where you see a race.

Heartbeats are an entirely separate mechanism to polls, they are happening in the backgrounds constantly. The defaults based on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md is that client sends heartbeat every 3 seconds, while broker treats client as gone after 45 seconds. But this is unrelated to rebalance callback.

(While here, a timeout setting does related to poll/rebalance is max.poll.interval.ms, default is 5 minutes. Client should poll messages at least this frequently, otherwise it might be kicked out of the group without receiving the rebalance callback - because if it doesn't poll, no way to get the callback).

I believe a good practice is to just rely on rd_kafka_consumer_poll, to get events together with messages, and don't do any rd_kafka_poll in the background. If you have long-running message processing that exceeds the 5 minutes, adjust the batch sizes or cut down message processing time rather. The background polls for events should be a last resort.

(In hw-kafka-client speak - I think the sync API should be preferred for explicit control. Note that message fetching still happens in the background by rdkafka, so the sync API is not slower)