microsoft / CSharpClient-for-Kafka

.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.
Other
463 stars 95 forks source link

Multiple fetcher threads created when partition leader changes #56

Open ducas opened 8 years ago

ducas commented 8 years ago

We have found ourselves receiving duplicate messages within the same consumer instance. When looking through the logs I noticed the following series of events:

  1. Broker no longer the leader for a partition causing a rebalance
  2. Consumer rebalances and receives n partitions (2 in this case)
  3. Consumer creates n+1 fetcher threads
  4. 2 Fetcher threads consistently retrieving messages from kafka for the same topic and partition

I've attached a scrubbed log excerpt... Any ideas would be helpful - consumer_log.txt

Digging into the source it looks like the issue might be a race condition in Fetcher.cs. On line 189 a topicInfo dictionary is built, but how does it know which broker to use...? On line 201 a new PartitionLeaderFinder is spun up, which ideally finds the leaders for partitions added to the and starts fetchers. Could this finish before the loop through partitions on line 209 starts and multiple fetchers be created...? Shouldn't this just be completed synchronously before the new fetchers are create...?

I'm a bit lost in this part of the code so please excuse the ignorance... 😄