confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
14 stars 852 forks source link

Consumer refetching/Rewinding the offset when new consumer joins #2117

Open ksdvishnukumar opened 10 months ago

ksdvishnukumar commented 10 months ago

Description

Consumer is refetching the same message again which causes the Outgoing messages is more in Eventhub metrics. I have a C# Consumer where am pulling the message and storing in to the list. As Confluent Kafka Dotnet does not support Batch Message Processing am handling from my application. Once read all the messages from local queue to the in memory list in application side and process it. To control the frequent the fetch as suggested by @mhowlett , as am consuming the first message using Consume(1000) for further message consume uses with Consume(TimeSpan.Zero).

Before I start the consumer, accumulated 9861 messages in the topic where am trying to consume. BTW I have a topic with 3 partitions.

How to reproduce

Sample Code to reproduce

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace IMS.Kafka.Package.Testbed
{
    public class ConsumeFetchControl : IHostedService
    {
        private readonly ILogger _logger;
        IConsumer<Null, string> consumer;
        CancellationTokenSource cts = new();

        public ConsumeFetchControl(ILogger logger) => _logger = logger;

        public async Task Consumer(CancellationToken cancellationToken)
        {
            string bootstrapServers = "vishnu-1tu-test.servicebus.windows.net:9093";
            string password = "<REPLACE_PASSWORD_HERE>";
            string topic = "TestTopic";
            string consumerGroup = "mygrp"; // Replace with your consumer group name

            List<ConsumeResult<Null, string>> batchConsumeMessages = new();

            var config = new ConsumerConfig
            {
                BootstrapServers = bootstrapServers,
                GroupId = consumerGroup,
                ClientId = consumerGroup + DateTime.UtcNow.Ticks,
                AutoOffsetReset = AutoOffsetReset.Earliest, // Start consuming from the beginning of the topic
                EnableAutoOffsetStore = true,
                EnableAutoCommit = false, // Disable auto-commit to have more control over the consumer's progress
                AutoCommitIntervalMs = 10000,
                EnablePartitionEof = true,
                ConnectionsMaxIdleMs = 180000,
                MaxPartitionFetchBytes = 1048576,
                QueuedMaxMessagesKbytes = 10240,
                PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
                IsolationLevel = IsolationLevel.ReadUncommitted,
                SocketNagleDisable = true,
                SocketKeepaliveEnable = true,
                MetadataMaxAgeMs = 180000,
                SessionTimeoutMs = 30000,
                MaxPollIntervalMs = 300000,
                CancellationDelayMaxMs = 200,
                SaslUsername = "$ConnectionString",
                SaslPassword = password,
                SaslMechanism = SaslMechanism.Plain,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                Debug = "consumer,cgrp,topic,fetch"
            };

            consumer = new ConsumerBuilder<Null, string>(config)
                .SetLogHandler((c, logHandler) =>
                {
                    _logger!.LogInformation($"Consumer Log Handler : {logHandler.Level.ToString().ToUpper()}|{DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}|{logHandler.Facility}|{logHandler.Name}|{logHandler.Message}");
                })
                .SetPartitionsAssignedHandler((c, partitions) =>
                {
                    string topicName = partitions?.FirstOrDefault()?.Topic ?? string.Empty;
                    if (!string.IsNullOrWhiteSpace(topicName))
                    {
                        string formattedAssigned = string.Join(',', partitions!.Select(p => p.Partition.Value).OrderBy(p => p));
                        var finalAssignment = c.Assignment.Concat(partitions!)?.Select(p => p.Partition.Value).OrderBy(p => p);
                        string formattedRemaining = string.Join(',', finalAssignment!);
                        _logger!.LogInformation($"{topicName} Newly assigned partition(s) : {formattedAssigned}, Current state: {formattedRemaining} @ {DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}");
                    }
                })
                .SetPartitionsRevokedHandler((c, partitions) =>
                {
                    string topicName = partitions?.FirstOrDefault()?.Topic ?? string.Empty;
                    if (!string.IsNullOrWhiteSpace(topicName))
                    {
                        string formattedRevoked = string.Join(',', partitions!.Select(p => p.Partition.Value).OrderBy(p => p));
                        var finalAssignment = c.Assignment.Where(atp => partitions!.Where(rtp => rtp.TopicPartition == atp).Count() == 0).Select(p => p.Partition.Value).OrderBy(p => p);
                        string formattedRemaining = string.Join(',', finalAssignment);
                        _logger!.LogInformation($"{topicName} Revoked partition(s) : {formattedRevoked}, Current state: {formattedRemaining} @ {DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}");
                    }
                })
                .SetPartitionsLostHandler((c, partitions) =>
                {
                    string topicName = partitions?.FirstOrDefault()?.Topic ?? string.Empty;
                    if (!string.IsNullOrWhiteSpace(topicName))
                    {
                        string formattedLost = string.Join(',', partitions?.Select(p => p.Partition.Value).OrderBy(p => p));
                        var finalAssignment = c.Assignment.Where(atp => partitions!.Where(rtp => rtp.TopicPartition == atp).Count() == 0).Select(p => p.Partition.Value).OrderBy(p => p);
                        string formattedRemaining = string.Join(',', finalAssignment);
                        _logger!.LogInformation($"{topicName} Lost partition(s) : {formattedLost}, Current state: {formattedRemaining} @ {DateTime.UtcNow.ToString("dd-MM-yyyy HH:mm:ss.fff")}");
                    }
                })
               .Build();

            consumer.Subscribe(topic);

            bool isFirstFetchDone = false;
            int batchMessageCount = 0;
            int maxMessagePerBatchToCommit = 500;

            while (!cts.IsCancellationRequested)
            {
                try
                {
                    var consumeResult = isFirstFetchDone ? consumer.Consume(TimeSpan.Zero) : consumer.Consume(1000);

                    if (consumeResult == null) { continue; }

                    if (consumeResult.IsPartitionEOF)
                    {
                        _logger.LogInformation("Reached End of Partition.");

                        //DO THE MESSAGE PROCESSING LOGIC HERE

                        consumer.Commit(consumeResult); //Assuming all processed successfully and committing
                        isFirstFetchDone = false;
                        batchMessageCount = 0;
                    }

                    if (consumeResult.Message.Value is string)
                    {
                        isFirstFetchDone = true;
                        batchConsumeMessages.Add(consumeResult);
                        batchMessageCount++;
                    }

                    if (maxMessagePerBatchToCommit == batchMessageCount)
                    {
                        //DO THE MESSAGE PROCESSING LOGIC HERE
                        if (true) //If all success
                        {
                            consumer.Commit(consumeResult); //Assuming all processed successfully and committing
                            isFirstFetchDone = false;
                        }
                        else //In case of any failure in the batch. Not considered the Transcational producer since Eventhub does not support this feature.
                        {
                            consumer.Seek(batchConsumeMessages.First().TopicPartitionOffset);
                        }

                        batchMessageCount = 0;
                    }
                }
                catch (KafkaException ex) when (ex.Message == "Broker: Specified group generation id is not valid" || ex.Message == "Local: Broker handle destroyed")
                {
                    if (ex.Error.IsFatal)
                    {
                        // https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors
                        break;
                    }
                    //NOT TO LOG ANYTHING
                }
                catch (ConsumeException e) when (e.Error.Reason.Contains("Application maximum poll interval"))
                {
                    //DO NOT REMOVE THIS BLOCK and DO NOT THROE EXCPEPTION.
                    //THIS LEADS TO STOP THE MESSAGE CONSUMPTION
                    _logger.LogWarning($"Received ConsumeException with Application maximum poll interval exceeded reason for the topic {topic}. Error Code : {e.Error.Code}, Resaon : {e.Error.Reason}");
                }
                catch (KafkaException e) when (e.Error.Reason.Contains("Application maximum poll interval"))
                {
                    //DO NOT REMOVE THIS BLOCK and DO NOT THROE EXCPEPTION.
                    //THIS LEADS TO STOP THE MESSAGE CONSUMPTION
                    _logger.LogWarning($"Received KafkaException with Application maximum poll interval exceeded reason for the topic {topic}. Error Code : {e.Error.Code}, Resaon : {e.Error.Reason}");
                }
                catch (OperationCanceledException ex)
                {
                    _logger.LogWarning($"OperationCanceledException caught in the topic {topic}: Message={ex.Message}, Source={ex.Source}, InnerException={ex.InnerException}, StackTrace:{ex.StackTrace}");
                }
            }
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await Consumer(cancellationToken);
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            try
            {
                cts.Cancel();
                if (consumer != null && (consumer?.Subscription?.Any() ?? false))
                {
                    consumer?.Close();
                    consumer?.Dispose();
                }
            }
            catch (KafkaException ex)
            {
                _logger!.LogWarning($"Handled Kafka Exception in {nameof(StopAsync)}");
            }
            catch (Exception ex)
            {
                _logger!.LogWarning($"Exception caught in {nameof(StopAsync)} and handled. Message={ex.Message}");
            }
        }
    }
}

Checklist

Please provide the following information:

@mhowlett @edenhill Could you please tell me what is happening here.

The metrics from the Eventhub Shows that number of outgoing message is more where i have only 9861 message to consume. From MS Support i got the info that the client is re reading the same message. Am bit not sure how to interpret the actual debug log message.

image

Edited: Each message is around 4.24 KB in the consume topic.

I see the below lines in the log, Is this causing the issue. If yes how to control this.

9/23/2023, 6:11:59.174 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:11:59.1747870|FETCH|derotest-bug-23092023-4638310463137519149#consumer-10|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [0] in state active at offset 0 (leader epoch 0) (2506/100000 msgs, 10643/10240 kb queued, opv 4) is not fetchable: queued.max.messages.kbytes exceeded

9/23/2023, 6:12:00.078 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:12:00.0781550|FETCH|derotest-bug-23092023-4638310463137519262#consumer-13|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [1] in state active at offset 0 (leader epoch 0) (2522/100000 msgs, 10711/10240 kb queued, opv 4) is not fetchable: queued.max.messages.kbytes exceeded

9/23/2023, 6:12:00.223 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:12:00.2238018|FETCH|derotest-bug-23092023-4638310463137519410#consumer-12|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [2] in state active at offset 0 (leader epoch 0) (2513/100000 msgs, 10672/10240 kb queued, opv 4) is not fetchable: queued.max.messages.kbytes exceeded

9/23/2023, 6:12:08.062 AM kafka-eventhub-test-0.(none) Consumer Log Handler : DEBUG|23-09-2023 06:12:08.0628556|FETCH|derotest-bug-23092023-4638310463137519149#consumer-10|[thrd:sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/boot]: sasl_ssl://vishnu-1tu-test.servicebus.windows.net:9093/0: Topic TestTopic [0] in state active at offset 500 (leader epoch 0) (2366/100000 msgs, 10048/10240 kb queued, opv 6) is not fetchable: queued.max.messages.kbytes exceeded

Eventhough i have set QueuedMaxMessagesKbytes as 10240 how it pulled more than threshold. in this case its getting purged?

ksdvishnukumar commented 10 months ago

Hi @mhowlett & @edenhill ,

I have couple of question.

  1. Once Consume() method calls with either cancellation token or timeout >0 will it start the message pooling by background thread from the broker and buffer it to the local queue? If yes, do we have any option to control it?

  2. With in a infinite while loop, the first call of Consume method either with >0 timeout or CancellationToken and afterwards if I use Consume(TimeSpan.Zero) will it stop the message polling by the background thread?

  3. If I pause the consumer by passing the Pause() method, will it stop the background thread to poll the message from the broker? I am aware that pausing the consumer will purge the local queue.