confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
2.78k stars 847 forks source link

[CooperativeSticky] Best practice to use manual commits & avoid commits during rebalance #2206

Open RonAmihai opened 2 months ago

RonAmihai commented 2 months ago

Description

Currently, as the following librdkafka issue describes: https://github.com/confluentinc/librdkafka/issues/4059 Performing manual commits during cooperative rebalance is prohibited and may cause additional follow-up to rebalance triggers.

Some Kafka libraries (example) prevents manual commit during rebalancing as a solution (In the mentioned case - the library raises RebalanceInProgressException; but that depends whether the rebalance state was fetched before the commit was performed. Otherwise, some scenarios of commit during rebalance may raise general commit errors).

However - it doesn't seem like confluent-kafka-dotnet prevents that.

Whenever trying to perform manual commits during rebalance, the following exception is thrown:

Confluent.Kafka.KafkaException: Broker: Specified group generation ID is not valid
   at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable`1 offsets)
   at Confluent.Kafka.Consumer`2.Commit(IEnumerable`1 offsets)

The only possible way I've come with so far to handle that situation (with the current implementation) is by:

  1. Querying the group state (either after each consumption or in background)
  2. Assert the state before each commit
  3. If the state is rebalance related (either PreparingRebalance or CompletingRebalance) then skip manual commit
  4. Manual commit will be performed after following comsume/s depending on the state
    var groupDescriptions = await _adminClient.DescribeConsumerGroupsAsync(["group_id"]);
    var groupState = groupDescriptions.ConsumerGroupDescriptions.FirstOrDefault(g => g.GroupId == "group_id")?.State;
    var isDuringRebalance = groupState is ConsumerGroupState.PreparingRebalance or ConsumerGroupState.CompletingRebalance;
    // Assert isDuringRebalance before each manual commit

That strategy mainly reduces scenarios like those but can only prevent them partially.

Can we have a better strategy to handle those situations? Is there other considerations one should take regarding commits during cooperative rebalances?

How to reproduce

  1. Create a consumer with partition.assignment.strategy=cooperative-sticky
  2. Trigger rebalance that affects some of the partitions
  3. Consume messages in non-affected partitions
  4. Perform manual commit to those messages offset
MaorDavidzon commented 3 weeks ago

Do you have any updates on this issue?