alexreidy / parallafka-for-dotnet

Parallelized Kafka consumer for maximizing throughput
Apache License 2.0
1 stars 1 forks source link

Rebalancing causes infinite loop in CommitAsync #10

Open Gonyoda opened 2 years ago

Gonyoda commented 2 years ago

Using the Confluent kafka adapter, an error during commit causes an infinite loop for some errors.

These errors seem to occur when the consumers are rebalanced and the Parallafka code attempts to commit a message offset. The MessageCommitter.CommitNow method loops endlessly (with a 99ms delay) since the error will never be resolved.

Gonyoda commented 2 years ago

Possible solutions:

When a commit failure occurs: . purge the queue of messages for the partition that failed the commit . purge all the messages

It is not yet known if, after a rebalance, that the specific consumer can keep messages for a partition that it did not lose, or if it needs to start over. Say a consumer is assigned 3 partitions. A new consumer joins the group and a rebalance occurs and this consumer now is assigned 2 of the 3 original partitions. Should the consumer remove all queued messages? Or just remove the messages from the 3rd partition for which it is no longer assigned?

Gonyoda commented 2 years ago

I believe the correct solution is: when a commit failure occurs due to a repartitioning event, discard all the messages for the partition. This can mean that messages from the partition are re-processed by the new consumer.

alexreidy commented 2 years ago

SetPartitionsRevokedHandler should make the graceful-rebalance happy path straightforward enough, but we'll still need to support edge cases where we detect partition loss at commit time.

It's not good if it ends up having to detect and purge at commit time - it means there's potential out-of-order handling, so it's kind of a last resort. Picture a bunch of messages "in the pipes" with a shared key. For some reason, maybe a network blip, the consumer no longer owns the partition, and commits fail. Maybe half the messages are handled, and then we purge the queues of that partition's records, and/or it simply stops receiving them. Well, the new partition owner across the cloud might have already consumed all the messages for that key, but the original slower consumer had no idea and "won," handling a middle message last - maybe an Upsert message that should have been followed by a Delete were it not for the rebalance and purge. I think there is a similar theoretical risk in the normal serial poll->handle->commit consumer flow in the edge case that it loses connection during a slow handle step, its partitions are reassigned, and then it finishes handling anyway. Hopefully, with the PartitionsRevokedHandler, we're at no more risk for that kind of race condition than the default Kafka consumer flow.

Need to verify that this callback is called as part of the Poll/Consume call as in Java and that subsequent ConsumeResults are never stale / from a still-revoked partition.