confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.63k stars 658 forks source link

cooperative-sticky doesn't work as expected #622

Open levin81 opened 3 years ago

levin81 commented 3 years ago

Description

Cooperative Sticky assignor doesn't work as expected. I have a topic (name "stam") with 10 partitions.

Load 1st consumer: 1st consumer -> 0,1,2,3,4,5,6,7,8,9

Load 2nd consumer: 1st consumer -> 1,3,5,7,9 2nd consumer -> 0,2,4,6,8

Load 3rd consumer: 1st consumer -> 1,4,7 2nd consumer -> 0,3,6,9 3rd consumer -> 2,5,8

I would expect 1st + 2nd consumer to not shuffle their partitions around like this.

Consumer 1 output:

% COOPERATIVE rebalance: 10 new partition(s) assigned: [stam[0]@unset stam[1]@unset stam[2]@unset stam[3]@unset stam[4]@unset stam[5]@unset stam[6]@unset stam[7]@unset stam[8]@unset stam[9]@unset]
% COOPERATIVE rebalance: 5 partition(s) revoked: [stam[0]@unset stam[2]@unset stam[4]@unset stam[6]@unset stam[8]@unset]
% COOPERATIVE rebalance: 0 new partition(s) assigned: []
% COOPERATIVE rebalance: 0 new partition(s) assigned: []
% COOPERATIVE rebalance: 3 partition(s) revoked: [stam[3]@unset stam[5]@unset stam[9]@unset]
% COOPERATIVE rebalance: 0 new partition(s) assigned: []
% COOPERATIVE rebalance: 1 new partition(s) assigned: [stam[4]@unset]

Consumer 2 output:

% COOPERATIVE rebalance: 0 new partition(s) assigned: []
% COOPERATIVE rebalance: 5 new partition(s) assigned: [stam[0]@unset stam[2]@unset stam[4]@unset stam[6]@unset stam[8]@unset]
% COOPERATIVE rebalance: 3 partition(s) revoked: [stam[2]@unset stam[4]@unset stam[8]@unset]
% COOPERATIVE rebalance: 0 new partition(s) assigned: []
% COOPERATIVE rebalance: 2 new partition(s) assigned: [stam[3]@unset stam[9]@unset]

Consumer 3 output:

% COOPERATIVE rebalance: 0 new partition(s) assigned: []
% COOPERATIVE rebalance: 3 new partition(s) assigned: [stam[2]@unset stam[5]@unset stam[8]@unset]

Did I miss something?

How to reproduce

I used the code in https://github.com/confluentinc/confluent-kafka-go/tree/master/examples/cooperative_consumer_example

Checklist

Please provide the following information:

edenhill commented 3 years ago

This sounds familiar to an issue in identifying the first and then sub-sequent rebalances which is fixed on librdkafka master and scheduled for the upcoming 1.7.0 release, which is due end of April.

mhowlett commented 3 years ago

see also: https://github.com/edenhill/librdkafka/issues/3306

i don't think this fixed? did i miss it? I had a bit of a look into this and it's because the assignor isn't taking into account owned partitions. initially seemed like it'd be easy to resolve, but I ended up concluding it's not going to be trivial (because the sticky assignor is complex and i didn't understand it without effort).

levin81 commented 3 years ago

@mhowlett I saw that issue, but unlike them, I don't receive any errors. Just wrong rebalancing. I tried out Shopify's Sarama (which has implemented the sticky assignor, sans the cooperative feature) and it works fine (according to their PR, they ported the Java version line for line almost). I'd love to use the official Confluent Go client w/cooperative! :) Stop-the-world on rebalance is not optimal for my use case.

mhowlett commented 3 years ago

we also copied it line-by-line, but there's a bug to sort out in how it's hooked up. that error is a separate problem (which isn't critical - auto commits are just being fired off at times at which they might fail). these should both be resolved in 1.7.0.