confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
44 stars 857 forks source link

Poll timeout #1515

Open tech7857 opened 3 years ago

tech7857 commented 3 years ago

Hi @mhowlett @edenhill

We have a .net core kafka consumer reading messages from a specific topic. The message flow in that topic is very high millions of messages per minute. We are repeatedly getting poll.ms.timeout error. Seems like librdkafka provides messages in batches but when out application is processing one by one and batch size is huge it is taking time more than the configured poll time and eventually timeouts. Can you please suggest how to handle this.

current polltime is 300000 ms which is too much , we dont want to just keep on increasing.

Any suggestions will be appreciated.

thanks

mhowlett commented 3 years ago

librdkafka provides messages to the application one by one (not in batches), and you must call consume at least as frequently as max.poll.interval.ms, else you will get this error. if an individual message takes longer than this time to process, you could do that in a different thread and pause the consumer, but you must still call consume at least as frequently as max.poll.interval.ms, even though paused. it sounds like that is not the case though if you are doing millions of messages per minute. it sounds more likely an issue with usage that can be fixed, but i can't comment without looking at the code.

tech7857 commented 3 years ago

Hi @mhowlett - if the message is returned one by one, there is no way processing of that message takes more than max.poll set. It is generally in few ms(50 ms). What can be wrong then. Snippet of code . Please suggest 48C3C779-C718-4D9A-B1A2-2CC1ED97C8D0

tech7857 commented 3 years ago

Hi @mhowlett - can you please suggest and help.

thanks

bartelink commented 3 years ago

@tech7857 we run a loop that declares itself alive like this (note when doing this, we shift the actual work to worker threads, i.e. the thread that does the Consume shifts to the Pause/Consume/Resume loop, while work continues in a different threadpool thread, which will ultimately result in the work queue being drained and consumption resuming) @mhowlett I'd appreciate if you could advise if there is any better way of doing such placeholder Consume calls to 'touch' the Consumer without being handed a message (I'm concerned the current impl has race conditions in using the Assignment property etc)

tech7857 commented 3 years ago

@bartelink - thanks for the reply. Can you provide a sample code for this implementation. The other thing is if messages are received one by one, there is no way that message processing can take more than 100 ms. So i am not able to understand why this error will come unless i am missing anything. Please let me know if i am missing something.

thanks

tech7857 commented 3 years ago

Hi @mhowlett can you please help and provide some guidance.

mhowlett commented 3 years ago

the support workload for confluent customers is high atm, they have precedence, and we have people sick. will get back to OS issues later.

tech7857 commented 3 years ago

@mhowlett - can you please guide on this. Your help will be greatly appreciated.

bartelink commented 3 years ago

@tech7857 the issue remains open so no need to assume anything. MHowlett is one person; he has clearly stated his constraints and I would urge you to respect that. Bottom line: if you are a confluent customer, there is a front door; if not, go read the code and/or ask on stack overflow or other places that are geared for answering questions and have tonnes more viewers than this medium

xqrzd commented 3 years ago

Have you timed the application to make sure it is calling consume()? I've only ever seen that error when consume() is not called. Like mhowlett said you must continue calling consume(), even if you can't process the message. Failing to call consume() also means you won't be notified if kafka gives your assignments away, which could lead to parallel processing of a single message if you have multiple consumers.

Since pause() has the unfortunate side effect of flushing messages in librdkafka's queue, what I've done to work around this problem (consumer is slow / gets stuck for a bit) is to use a queue. When you call consume(), you write to the queue, and if it's full, tell the consumer to seek back to the message you've just consumed.

In psueodo code,

while (!stoppingToken.IsCancellationRequested)
{
    try
    {
        var result = _consumer.Consume(500);
        if (result is null)
            continue;

        if (!queue.TryWrite(result))
        {
            consumer.Seek(new TopicPartitionOffset(result.TopicPartition, result.Offset));
            Thread.Sleep(500);
        }
    }
    catch (ConsumeException ex)
    {
        // log error
    }
}

This approach means you must disable enable.auto.commit, otherwise it will commit messages that are in the queue but haven't been processed yet. The queue can be processed message by message or in batches, I used System.Threading.Channels. I'm also curious if there are better fixes for this problem.

filipeesch commented 3 years ago

Hi @mhowlett @edenhill

We have a .net core kafka consumer reading messages from a specific topic. The message flow in that topic is very high millions of messages per minute. We are repeatedly getting poll.ms.timeout error. Seems like librdkafka provides messages in batches but when out application is processing one by one and batch size is huge it is taking time more than the configured poll time and eventually timeouts. Can you please suggest how to handle this.

current polltime is 300000 ms which is too much , we dont want to just keep on increasing.

Any suggestions will be appreciated.

thanks

Hi @tech7857, try using KafkaFlow (https://github.com/farfetch/kafka-flow) on top of Confluent Client. The Pause() method implemented there already has this Consume() call logic in background, avoiding partition revokes when max poll interval expires.