dotnetcore / CAP

Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
http://cap.dotnetcore.xyz
MIT License
6.68k stars 1.28k forks source link

Add new option to support parallel execute subscriber #1513

Closed leribspace closed 6 months ago

leribspace commented 7 months ago

The configuration ConsumerThreadCount does not process messages in parallel when topic partition count is 1. If partition count is greater than 1, messages are processed in parallel when they are in different partitions (e.g. if ConsumerThreadCount is set to 10 but partition count is 2, only two messages will be processed in parallel). When the flag UseDispatchingPerGroup is set to true, messages from the same partition are processed in parallel (per my observed behavior, library picks up the number of messages defined in the ConsumerThreadCount, processes them asynchronously, waits for all the tasks to be completed and moves to the next batch).

I would like to completely understand what the correct configuration for the following requirements would be. I am trying to achieve the following:

  1. Deploy multiple instances of the consumer that consumes messages from the given topic with n number of partitions.
  2. Each instance should consume messages from a single partition at a time and process these messages in parallel (I should be able to define how many messages are processed in parallel).
yang-xiaodong commented 6 months ago

If you configure multiple consumer threads, each thread will correspond to a partition of Kafka, similarly multiple instances and multiple threads serve the same purpose also corresponding to multiple Kafka partitions, kafka does not support a partition corresponding to multiple consumers. If you want to parallelize the process just set EnableConsumerPrefetch to true, it does not support specifying the number of parallelism, the consumer task will be scheduled by the .NET thread pool.

leribspace commented 6 months ago

Thank you for your response. If EnableConsumerPrefetch supported specifying the number of parallelism, it would be the best option in my scenario. Unfortunately, I cannot enable it because I make db connection and that would lead to exhaustion of connection pool. Are there any risks of using UseDispatchPerGroup set to true and ConsumerThreadCount greater than 1 at the same time? Based on the observed behavior this does what I am looking for:

  1. Reads the number of messages defined in the ConsumerThreadCount.
  2. Executes them in Parallel.
  3. Waits all of them to be done.
  4. Moves to the next batch.

However, I want to be on the safer side and make sure there are no edge cases when this combination may behave unexpectedly.

Note: Each message is independent, and ordering does not matter in my use case.

yang-xiaodong commented 6 months ago

There seems to be no problem, but this may result in some wasted threads and consumers. In other words, you don’t actually need more instances, you just need a single instance to invest more resources in cpu, memory, etc.

PoteRii commented 6 months ago

@yang-xiaodong Hello, thank you for the information. I want to better understand the configuration. Regardless of what ConsumerThreadCount is set to, Kafka will still assign some consumer to a partition right? so there will be no orphan partitions.

If I have 10 partitions and 1 replica of a service with ConsumerThreadCount set to 1, it means that my service will read all the messages sequentially from partitions 0,1,2,etc. (Kafka will be in charge of giving me those) If I increase ConsumerThreadCount and set it to 10, now the messages from different partitions will be read in parallel. The same will happen if I have 10 replicas with ConsumerThreadCount set to 1 on all of them. Am I right with both cases?

Now, if I want to do parallel processing after storing the message in received table, I need to set EnableConsumerPrefetch to true and it will create a Channel (https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs#L71). What I can`t grasp is line 81 inIDispatcher.Defaultclass - If ConsumerThreadCount is set to 1, than there is still a single thread that reads from that channel, isnt it? if so, how is processing still parallel with EnableConsumerPrefetch=true and ConsumerThreadCount =1?

PoteRii commented 6 months ago

well, I guess processing is still parallel in EnableConsumerPrefetch=true and ConsumerThreadCount =1 configuration because of this line: https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs#L249

Whereas dispatcher per group creates the channel similar to EnableConsumerPrefetch but the difference in this condition https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs#L257 gives us the expected behavior of controlling parallelism

yang-xiaodong commented 6 months ago

@PoteRii Yes, Actually the option EnableConsumerPrefetch will rename to EnableSubscriberParallelExecute in the future version

PoteRii commented 6 months ago

It would be nice to add another property as well for thread count, e.g. SubscriberParallelExecuteThreadCount

yang-xiaodong commented 6 months ago

Hi @leribspace @PoteRii ,

Each instance should consume messages from a single partition at a time and process these messages in parallel (I should be able to define how many messages are processed in parallel).

If a partition corresponds to a consumer, then messages will be pulled to the client one by one. By mentioning parallel processed do you mean that some buffer exists and messages are first placed into the buffer a certain number of times and consumed them in parallel?

leribspace commented 6 months ago

Hi @yang-xiaodong ,

The idea is to introduce another option PrefetchThreadCount that will work in combination with EnableConsumerPrefetch. Which means that this https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs#L70 code block will become:


var capacity = _options.PrefetchThreadCount * 300;
_receivedChannel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor?)>(
    new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
    {
        AllowSynchronousContinuations = true,
        SingleReader = _options.PrefetchThreadCount == 1,
        SingleWriter = true,
        FullMode = BoundedChannelFullMode.Wait
    });

await Task.WhenAll(Enumerable.Range(0, _options.PrefetchThreadCount)
    .Select(_ => Task.Run(Processing, _tasksCts.Token)).ToArray()).ConfigureAwait(false);

and instead of fire and forget: https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs#L249

await _executor.ExecuteAsync(item1, item2, _tasksCts.Token).ConfigureAwait(false);

This means whenever you have ConsumerThreadCount set to 1 consumer instance will be reading from the single partition, messages from that partition will be buffered based on the PrefetchThreadCount option, once all the messages from that buffer is processed, next batch will be fetched.

Note: the code above is for demonstrational purposes to get my idea across.

yang-xiaodong commented 6 months ago

Hi,

We will make the following adjustments in the next release:

  1. Rename EnableConsumerPrefetch to EnableSubscriberParallelExecute, EnableConsumerPrefetch has been marked as obsolete.
    If enabled, CAP will prefetch a batch of messages from the broker in advance and place them in the memory buffer, and then execute the subscription method in parallel; when the subscription method finishes executing, it will prefetch the next batch of messages to the buffer and execute them in parallel.

  2. Add new option SubscriberParallelExecuteThreadCount to configure the number of threads used when parallel execution is enabled, defaults to the number of processors.

  3. Add new option SubscriberParallelExecuteBufferFactor to set the buffer size, which is a multiplication factor of SubscriberParallelExecuteThreadCount.

UPDATE: Preview version 8.1.1-preview-230008876 with the adjustments is released to nuget.