microsoft / CSharpClient-for-Kafka

.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.
Other
463 stars 95 forks source link

Balanced consumer in new group doesn't retrieve messages from topics that don't start at offset 0 #39

Closed ducas closed 8 years ago

ducas commented 8 years ago

Use Case

As a new balanced consumer I want to consume messages from an old topic So that I can start to perform operations based on these messages

Description

I have a topic with 2 partitions that has been published to for weeks now. The logs have been cleaned due to retention and it now has a starting offset greater than 0 on both partitions. When I start building an application that will use a balanced consumer via the ZookeeperConsumerConnector and try to consumer messages from this topic, I expect to (by default) start at the first message on each partition. Instead I find that I receive no messages.

Repro Steps

  1. Find a topic that has had its logs cleaned up so no longer starts at 0
  2. Create a new balanced consumer with a new group id usine ZookeeperConsumerConnector and open a stream to this topic
  3. Wait for messages...

    Notes

    • Wireshark shows that some comms with kafka are happening, but they quickly end and only zookeeper is being talked to.
    • If I manually create the path structure and an offset in Zookeeper for the consumer (e.g. create /consumers/my.consumer/offsets/my.topic/0 100) and restart the application it starts to receive messages.
    • Here is a log file that shows pretty much nothing happening -KafkaNet.Lib._[15696]Detail.log2016_03_31.txt
ducas commented 8 years ago

Another workaround seems to be setting the ConsumerConfiguration.MaxFetchBufferLength very high.

What appears to be happening is the fetcher is trying to get [MaxFetchBufferLength] messages from the last consumed offset, BUT if the last Consumed Offset < [EarliestOffset] - [MaxFetchBufferLength] then no messages are returned and the next offset to consume is never set.

When a rebalance happens we get the committed offset and and set up the PartitionTopicInfo setting the consumed offset to the committed offset. I believe this is causing the behaviour we are seeing because the last consumed offset is too low for anything to be returned.

ducas commented 8 years ago

This line seems to be causing this problem - https://github.com/Microsoft/Kafkanet/blob/master/src/KafkaNET.Library/Consumers/FetcherRunnable.cs#L83

IEnumerable<PartitionTopicInfo> fetchablePartitionTopicInfos = _partitionTopicInfos.Where(pti => pti.NextRequestOffset - pti.ConsumeOffset < _fetchBufferLength);

If the consume offset is too low then the FetcherRunnable will consider it not fetchable and not issue any fetches...

ducas commented 8 years ago

The aforementioned line of code raises an interesting issue - should the fact that we're losing messages just be swallowed...? I don't think so! This should probably be exposed as an offset out of range error.

So... I'm going to solve this as 2 separate issues:

  1. New consumer group trying to start at 0 instead of the earliest/latest offset
  2. Existing consumer group with next read offset - consumed offset < fetch buffer length freezing

I'll continue this issue as the first point and open a new one to deal with the existing consumer.

In the PR for this issue I will try to fix this behaviour by setting up the PartitionTopicInfo for a new consumer with consumed offset = next fetch offset - 1