wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

Leader change during re-balance may cause partition not being consumed #125

Open waterlx opened 6 years ago

waterlx commented 6 years ago

Simplified it into the following scenario: 3 brokers. 1 topic with 4 partitions. 2 consumer instances to consume that topic. The start index of broker, partition and consumer is 0.

When c0 (consumer instance 0) calls utils.go#dividePartitionsBetweenConsumers(), the leaders are like:

After sort(by leader then by partition id), partitions is like

[
    {p0 b0 xxxx}  // {partition leader address}
    {p3 b0 xxxx}
    {p1 b1 xxxx}
    {p2 b2 xxxx}
] 

So c0 gets its myPartitions (to claim) like p0, p3.

Then p0 somehow change its leader to b2. The leaders are like:

And another consumer instance c1 calls utils.go#dividePartitionsBetweenConsumers(). After sort(by leader then by partition id), partitions is like

[
    {p3 b0 xxxx}
    {p1 b1 xxxx}
    {p0 b2 xxxx}
    {p2 b2 xxxx}
]

c1 gets its myPartitions (to claim) like p0, p2.

As a result, we have a condition that c0 tries to claim p0 and p3 while c1 tries to claim p0 and p2.

In utils.go, we sort the partitionLeader by leader firstly.

func (pls partitionLeaders) Less(i, j int) bool {
    return pls[i].leader < pls[j].leader || (pls[i].leader == pls[j].leader && pls[i].id < pls[j].id)
}

When leader changes between 2 calls of dividePartitionsBetweenConsumers(), the result after sort is changed. I believe the root cause it here.

waterlx commented 6 years ago

I am not sure if Less() could be changed to drop the comparison of leader and compare partition.id only. The sort on leader will have the following benefit, according to https://kafka.apache.org/documentation/#impl_consumerrebalance

we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.

The drop of leader comparison will break that. But keep comparing leader will not let us avoid the condition described in the issue when there is a leader change

waterlx commented 6 years ago

As a second thought, shall we trigger a re-balance when there is a leader change? Could be a solution for this issue?

waterlx commented 6 years ago

If I get it correctly, Kafka Java client does not consider leader and sorts on partition in numeric order. See RangeAssignor in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/PartitionAssignor.scala

secfree commented 5 years ago

@waterlx Thank you very much for your detailed analysis. One of my legacy projects is using this library and encountered the same problem.