confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
70 stars 861 forks source link

Question - Implementing Threads in Consumer code #654

Open vjrag opened 6 years ago

vjrag commented 6 years ago

Question

Can someone please share the best practices for implementing worker threads in Kafka Consumer program ? We would have one consumer subscribed to multiple topics and poll for messages. Once the message is received, need to hand this over to worker threads for processing the messages, so that main thread which polls are messages is not blocked.

Any sample code / reference link is appreciated.

Thanks,

mhowlett commented 6 years ago

we need to do an example or blog post outlining variations on this. you just want logic to maintain N worker threads at any given time. if a thread finishes, call Consumer.Consume and start a new worker with the result. Note that librdkafka will aggressively poll for new messages behind the scenes for you, so Consumer.Consume will not block if there are new messages to process. you may need to tune librdkafka parameters related to buffering (but probably not).

vjrag commented 6 years ago

Is it good practice to have a queue in consumer side (May be a Blocking Collection ?) and fill it from On Message event.... We can stop polling if the queue size reaches a certain limit.

There would be certain set of Worker threads which would dequeue and process the messages in background...

mhowlett commented 6 years ago

librdkafka already gives you this queue internally, and you can tune the size - i don't think there would be any benefit to having another one. note also that OnMessage has been depreciated in the 1.0 API (use Consume instead).

vjrag commented 6 years ago

Thanks @mhowlett . I will use Consume instead of Poll...

Do you have any sample to change the properties defined in librdkafka. I am using confluent-kafka-dotnet. How internal queue works in librdkafka.

vjrag commented 6 years ago

@mhowlett - I have put some sample code here with Consume Method / Tasks. I am adding a max of 10000 messages to a list & waiting for it to process before proceeding with next consume call.

Do you see any problems with this approach ?

` class Program { public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "localhost:9092", // Note: The AutoOffsetReset property determines the start offset in the event // there are not yet any committed offsets for the consumer group for the // topic/partitions of interest. By default, offsets are committed // automatically, so in this example, consumption will only start from the // eariest message in the topic 'my-topic' the first time you run the program. AutoOffsetReset = AutoOffsetResetType.Earliest };

        using (var c = new Consumer<string, string>(conf))
        {
            c.Subscribe(new List<string>() { "my-topic", "test-topic" });

            bool consuming = true;
            // The client will automatically recover from non-fatal errors. You typically
            // don't need to take any action unless an error is marked as fatal.
            c.OnError += (_, e) => consuming = !e.IsFatal;

            List<Task> eventTaskList = new List<Task>();

            while (consuming)
            {
                try
                {
                    var cr = c.Consume();

                    Task eventTask = Task.Run(() =>
                    {
                        Console.WriteLine($"{ DateTime.Now } - Consumed message '{cr.Key}' '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        //Do some work here
                        Thread.Sleep(500);
                        Console.WriteLine($"{ DateTime.Now } - Done Consuming message '{cr.Key}' '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");

                    });

                    if (eventTaskList.Count >= 10000)
                    {
                        Task.WaitAll(eventTaskList.ToArray());

                        Console.WriteLine($"{ DateTime.Now } - Done Doing batch of " + eventTaskList.Count);

                        eventTaskList.Clear();
                    }
                    else
                    {
                        eventTaskList.Add(eventTask);
                    }

                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error occured: {e.Error.Reason}");
                }
            }

            // Ensure the consumer leaves the group cleanly and final offsets are committed.
            c.Close();
        }
    }
}`
marcosalpha3 commented 6 years ago

Hi vjrag

I apreciate your example with tasks. I have a doubt the autoCommit in this case is the better way ?

Best Regards

hooblix commented 5 years ago

vjrag,

I used this sample code in VB 2017 with confluent.kafka version 1.11. It complains that the type or namespace ConsumerConfig could not found. Do you know what I might miss?

Thanksm

mhowlett commented 5 years ago

ConsumerConfig is new - which version of Confluent.Kafka are you using? highly recommend 1.0-beta3 over 0.11.6.