confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.52k stars 649 forks source link

cooperative-sticky rebalancing #680

Open Boklazhenko opened 2 years ago

Boklazhenko commented 2 years ago

Hello

Maybe it is only question, but I expected another behavior

suppose I have 2 topics with 3 partitions each and 3 consumers in one group

when I call

consumer.SubsribeTopics([]string{"1", "2"}, nil)

with "round robin" strategy, it works good ) I have fisrt consumer - 1.1, 2.1. second - 1.2, 2.2 third - 1.3, 2.3.

BUT if i change strategy to "cooperative-sticky" I can have first - 1.1, 1.2 second - 1.3, 2.1 thrid - 2.2., 2.3

is it right? If no, I can provide more detail info

Thank you!

edenhill commented 2 years ago

I believe that looks right, it behaves similarily to the range assignor.

jgeiger-trane commented 2 years ago

Based on his description it looks backwards to me? His "round-robin" looks like how I see "range" working.

I've got 2 topics I need to co-partition, events and commands.

When I use the range assignor, it properly co-partitions the topics in the consumers.

So for 4 partitions in each topic and 2 consumers, it would look like this:

C1: [events0, events1, commands0, commands1]
C2: [events2, events3, commands2, commands3]

When I use the "cooperative-sticky" it starts out fine with a single consumer:

C1: [events0, events1, events2, events3, commands0, commands1, commands2, commands3]

But when I add the second consumer, it just assigns all the commands to the second consumer and leaves the events on the first consumer.

C1: [events0, events1, events2, events3]
C2: [commands0, commands1, commands2, commands3]

My configuration is set like this, where the change for C2, is "instance01"

"auto.offset.reset":               earliest,
"bootstrap.servers":               "localhost:9092",
"broker.address.family":           "v4",
"enable.auto.commit":              false,
"go.application.rebalance.enable": true,
"group.id":                        "cgroup",
"group.instance.id":               "instance00",
"partition.assignment.strategy":   "cooperative-sticky",
"session.timeout.ms":              10000,