confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
68 stars 861 forks source link

Auto committed offset value one ahead of it's consumed offset value #586

Closed robty123 closed 6 years ago

robty123 commented 6 years ago

Description

When consuming from a topic the OnMessage event handler's offset value is always one behind the offset value received in the OffsetCommitted event handler. It appears the OnMessage event handler receives offsets starting from a zero based index whereas the OffsetCommitted event handler starts from a one based index. This gives the illusion in the logs that messages are consumed after they have already been committed.

The scenario below describes consuming two messages from a brand new topic:

Message1 comes in and its "message.Offset.Value" is displayed as 0 inside the OnMessage event handler. It is then auto committed 5 seconds or so later. The "message.Offset.Value" displayed in the OffsetCommitted event handler is 1. Next message2 comes in and its "message.Offset.Value" is displayed as 1 inside the OnMessage event handler. It is then auto committed 5 seconds or so later. The "message.Offset.Value" displayed in the OffsetCommitted event handler is 2.

I have checked inside control center and can verify via the "consumer lag" section there's no more than 2 messages. The Lag is zero, current-offset is 2 and end-offset is 2.

How to reproduce

Start a consumer with an OffsetCommitted and OnMessage event handler and then produce two or three messages onto a topic. Observe the "message.Offset.Value" in the OffsetCommitted and OnMessage event handlers. See consumer code below:

public class Consumer 
    {
        private readonly ILogger _logger;
        private readonly ITopicRetriever _topicRetriever;
        private bool _consumeMessages;

        public Consumer(ILogger logger,
            ITopicRetriever topicRetriever)
        {
            _logger = logger;
            _topicRetriever = topicRetriever;
            _consumeMessages = true;
        }

        public void ConsumeMessages(string brokerList)
        {
            using (
                var consumer = new Consumer<byte[], byte[]>(ConstructConfig(brokerList, true),
                    new ByteArrayDeserializer(),
                    new ByteArrayDeserializer()))
            {
                SetupEventListeners(consumer);

                var topics = _topicRetriever.GetAll();

                consumer.Subscribe(topics);
                foreach (var topic in topics)
                {
                    _logger.ExtendedInfo("Subscribed to topic", new {Topic = topic});
                }

                _logger.ExtendedInfo("Started consumer instance, Ctrl-C to stop consuming", new { Consumer = consumer.Name, GroupMemberId = consumer.MemberId });

                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true; // prevent the process from terminating.
                    _consumeMessages = false;
                };

                while (_consumeMessages)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }

        private void SetupEventListeners(Consumer<byte[], byte[]> consumer)
        {
            consumer.OnMessage += (_, message) => MessageReceived(message);

            consumer.OnOffsetsCommitted += (_, offsets) => OffsetCommitted(offsets);
        }

        private void OffsetCommitted(CommittedOffsets offsets)
        {
            foreach (var offset in offsets.Offsets)
            {
                _logger.ExtendedInfo("Offsets Committed",
                    new
                    {
                        offset.Topic,
                        offset.Partition,
                        Offset = offset.Offset.Value,
                        RealOffset = offset.Offset.Value -1
                    });
            }      
        }

        private static Dictionary<string, object> ConstructConfig(string brokerList, bool enableAutoCommit)
        {
            return new Dictionary<string, object>
            {
                {"group.id", "scv-batchProcessor-consumer"},
                {"enable.auto.commit", enableAutoCommit},
                {"auto.commit.interval.ms", 5000},  
                {"statistics.interval.ms", 60000},
                {"bootstrap.servers", brokerList},
                {"auto.offset.reset", "earliest"}
            };
        }

        private void MessageReceived(Message<byte[], byte[]> message)
        {
            _logger.ExtendedInfo("Consuming message",
                new
                {
                    message.Topic,
                    MessageReceived = message.Timestamp.UtcDateTime,
                    message.Partition,
                    Offset = message.Offset.Value
                });
        }

    }

Checklist

Please provide the following information:

mhowlett commented 6 years ago

this is by design - the convention used by Kafka is that the committed offset is the next message to be read. i also think it would cause less confusion if instead the last committed message was defined to be the last message that was read, but given existing convention, I think the API as is is the correct choice.

robty123 commented 6 years ago

Thanks very much for your quick response. Appreciate you clearing that up for me.