confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
61 stars 861 forks source link

Consumer Hangs while Dispose in K8s #2013

Open ksdvishnukumar opened 1 year ago

ksdvishnukumar commented 1 year ago

Description

I have a case where the consumer hangs indefinitely almost 10 Minutes, and I cannot figure out why. I use Confluent Kafka to connect to Eventhub.

I have 3 instances of application running. Out of that 2 are getting disposed faster. But 1 hangs. This scenario is not happening all the time but 20% (1/5) its happening.

How to reproduce

Its a very simple Consumer application you can use.

Checklist

Please provide the following information:

paul-cheung commented 1 year ago

Some issues we encountered as well in K8S. I am using Confluent.Kafka 2.0.2 in our project. Few mins after the application started, the following error message showed: 4|1677860967.721|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 151ms (adjust max.poll.interval.ms for long-running message processing): leaving group Not sure if some configuration related to this issue for fixing.

bartelink commented 1 year ago

@paul-cheung This message/situation is unrelated to this issue (though to be fair, this issue is of the "stuff is going wrong sometimes in our environment; we have not narrowed it down" variety which is hard for the maintainers to produce). The config value the message cites is the maximum amount of time your code is allowed between polling for a message from the client. In other words, it's saying "you said I was to assume you're dead if you took more than 5 minutes to come back and ask for the next message. This is your notice that you took 151ms longer and we automatically removed you from the healthy clients list in accordance with that". Your choices are to either guarantee to check more frequently by adjusting your code, or extend that poll interval.

plachor commented 9 months ago

@ksdvishnukumar I do not get which one is true :P

I have a case where the consumer hangs indefinitely almost 10 Minutes

is it indefinitely or 10 minutes ? I spotted an issue where one of my consumer instance was not disposed in my process for days and I'm trying to reproduce it / find similar issue.

In my case I'm calling Close and than Dispose on my consumer instance. It seems this correlated with temporary network partition to broker as I seen logs with connection refused.

ksdvishnukumar commented 9 months ago

@plachor, I encountered this issue some times close to 10 min and some time even 1 hr (Pipeline Max Timeout). Since am using Azure Pipeline to deploy the pod in AKS, Pipeline has 1hr Max Time out. After that it times out.

I also use the same way as you mentioned

try { tokenSource?.Cancel();
await Task.Delay(3000); //Waiting 3 Sec here in order to give the some breathing time to the consume method to stop the consumption. Else we will be getting an Access Violation Exception which cant be caught in the exception block if (_consumerBuilder != null && (_consumerBuilder?.Subscription?.Any() ?? false)) { _consumerBuilder?.Close(); _consumerBuilder?.Dispose(); } } catch (KafkaException ex) { _logger!.Log($"Handled Kafka Exception in {nameof(CloseConsumeBrokerMessageClient)}", LogLevel.Warning); } catch (Exception ex) { _logger!.Log($"Exception caught in {nameof(CloseConsumeBrokerMessageClient)} and handled. Message={ex.Message}", LogLevel.Warning); }

plachor commented 9 months ago

I found those on librdkafka, I belive in my case this is first on the list: