Open RiteshKo opened 5 years ago
you probably want a thread per partition the consumer is handling rather than using a thread pool so you can be sure processing for messages in a given partition doesn't complete out of order which would result in committing offsets out of order. your use-case may also require in order processing. I would set the threads up / pull them down in the rebalance handler (note: this api is changing slightly in the next release - it's the one detail we haven't finalized - but it'll be similar).
marking as enhancement, since we should do an example of this.
@mhowlett: I don't have a need to process the messages in the order if we have multiple tasks which will process messages concurrently. Do you have a solution for that for the time being? Basically, multiple tasks processing multiple messages in parallel.
@RiteshKo You can use a ConcurrentQueue<T>
to store items read from Kafka and then process that queue with multiple threads/tasks.
You can also use System.Threading.Channels
if you want more features like bounding the number of items in the queue.
@manigandham: I am actually using the Concurrentqueue only but my concern is to raise the task after recieving the messages. Currently I am raising new tasks everytime new message arrived. But that will definitely reduces the performance. I want to create some specific number of tasks which will recieve messages in parallel.
I'm not sure what you mean by "raise the task" but you should never just start a whole bunch of async methods without any tracking (especially since there's no guarantee the task is finished if you don't await the result).
You can read from the queue in a while
loop and call a method for each item to process:
while(consumerIsRunning)
while(queue.TryDequeue(out var item)){
await ProcessItem(item);
}
This is a single "thread" that will read from the queue and process items sequentially. If you want to process 4 in parallel then create 4 while
loops. Wrap this looping code into an async method and you can call it as many times as you want to create a queue processing task, then await them to ensure they're finished.
Even we are trying the same thing with building a new consumer for each partition.If my topic has 4 partitions, we initialize 4 consumers and leave the assignment of partitions within consumer group to kafka.This is similar to java implementation of kafka driver. Is this implementation correct or should we have a single consumer instance irrespective of number of partitions a topic has? Regards, Anirudh
@manigandham the DataFlow library (powered by the TPL) can also help to address the issue: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library
@mhowlett are there any plans of doing that example in the near future?
marking as enhancement, since we should do an example of this.
@mhowlett Is there any code available to have thread per partition for the consumer?
This is in F# so not a direct answer to any of these questions but...
It's a pretty thin layer using only BCL constructs and a C# style more than flowery-FP style. As such, it might:
(I am the de facto maintainer of it)
Hi. Maybe you have something like - https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/ ?
Hi all, I've created this sample to support the partition per worker pattern: https://github.com/allantargino/dotnet-kafka-parallel-consumer It is just a sample and probably not production ready, but it is a start. If you think it is valuable, we could improve it and merge it in this repository's sample directory.
Hi, I was looking for such a activity when it came to use Kafka's consumer in dotnet. Now I have shared my solution which make you able to process the kafka messages parallelly and commit them based on their partition:offset. I have opened a PR on this repository to put such a light weight solution for this purpose as part of the examples. Here: https://github.com/confluentinc/confluent-kafka-dotnet/pull/2048
Description
I am new to Kafka. Currently I have a windows service which is consuming messages from one Kafka topic having 10 partitions one by one due to which processing is slow. Can anyone help me with some code snippets or the best way to first consume messages from Kafka and then process them through multiple tasks/threads in windows service concurrently in dot net?
Checklist
Please provide the following information: