Jroland / kafka-net

Native C# client for Kafka queue servers.
Apache License 2.0
482 stars 232 forks source link

Hotfix/set offset bug #72

Closed dayenda closed 9 years ago

dayenda commented 9 years ago

In a situation where the consumer class is able to fetch messages faster from a Kafka partition than the messages can be processed, setting the partition offset is ignored.

In the Consumer.ConsumeTopicPartitionAsync method the offset is retrieved in the beginning of the while loop and used afterwards. When handling the response and adding the messages to the _fetchResponseQueue, the task may block if the queue is full.

If the client of the consumer class sets the offset while the consumer is blocked waiting for free space in the _fetchResponseQueue, the consumer class will override the offset after having added all current messages to the queue.

foreach (var message in response.Messages)
{
    _fetchResponseQueue.Add(message, _disposeToken.Token); <-- While blocking, if offset is set with a call to the SetOffsetPosition method, it will be overwritten later

    if (_disposeToken.IsCancellationRequested) return;
}

var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1;
_partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset); <-- Offset is overwritten here
dayenda commented 9 years ago

Sorry, i think i may have introduced another bug. The last line in the code snippet above should be changed to

_partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple<long, bool>(nextOffset, false), (i, l) => l.Item2 ? new Tuple<long, bool>(l.Item1, false) : new Tuple<long, bool>(nextOffset, false));

So the code from my pull request is not quite good enough.