antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

How to set prefetchCount or run multiple worker on queue #35

Closed volethanh closed 4 years ago

volethanh commented 4 years ago

Hi,

I want to ask how to set prefetchCount for consumer or how can I run multiple consumers in parallel ?

antonyvorontsov commented 4 years ago

Hi!

Unfortunately, there is no such option. Right now this library allows you only fetch messages one by one and process it the same way (one after another). I have actually thought about making some implementation of message handlers which can be set up with prefetchCount value and process batches of messages. But I have not had a use case for that yet.

I will try to implement that feature as soon as possible. Will keep you updated.

Best regards, Antony

antonyvorontsov commented 4 years ago

Volethanh, hi again!

I have done some work to make it possible to set a prefetch count value for a consumers and ended up with this https://github.com/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection/pull/37

The idea is to make a consumer that will only be responsible for reading messages from one queue

public class AnotherCustomBatchMessageHandler : RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers.BatchMessageHandler
    {
        readonly ILogger<AnotherCustomBatchMessageHandler> _logger;

        public AnotherCustomBatchMessageHandler(
            IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
            ILogger<AnotherCustomBatchMessageHandler> logger)
            : base(batchConsumerConnectionOptions, logger)
        {
            _logger = logger;
        }

        protected override ushort PrefetchCount { get; set; } = 500;

        protected override string QueueName { get; set; } = "queue.name";

        protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
        {
            _logger.LogInformation("Handling a batch of messages.");
            foreach (var message in messages)
            {
                _logger.LogInformation(message);
            }
            return Task.CompletedTask;
        }
    }

You have to set a value for PrefetchCount and QueueName properties to make it work. After the application starts working, the handler reads messages and stores it into a concurrent bag. After reaching the value of PrefetchCount it passes a collection of messages into the HandleMessage method, so you can do whatever you want (a bulk insert etc.). The only nuance is that a collection of messages won't be passed until it reaches a PrefetchCount size. If this behaviour fits your claim then I will make an update of the library with this functionality.

Hit me up with your thoughts on this.

Best regards, Antony

antonyvorontsov commented 4 years ago

Well, I have released a new library version v4.0.0 so you can try it out! If it fits you, feel free to close this issue. Otherwise, I will close it myself after month or so.

Best regards, Antony

volethanh commented 4 years ago

Thanks antony, i will try it.

antonyvorontsov commented 4 years ago

Closing this issue. If you still have uncleared questions feel free to re-open it