antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

Add timer flushing functionality to BatchHandler #48

Closed Bogoev91 closed 3 years ago

Bogoev91 commented 4 years ago

Hi there, we already use your library in our projects and find it pretty easy to work with. For one of our projects though, we need to use a batching consumer which will flush currently buffered messages if specific time interval passes (even if the batch size isn't fulfilled yet). In the BatchMessageHandlers current implementation unless the prefetch count is reached, no message processing will occur, possibly leaving messages stale in the consumer. My first suggestion would be to include some kind of timer which triggers message processing on a regular basis. Another suggestion would be to give more fine-grained control to clients of BatchMessageHandlers over which messages are acknowledged from the batch and which are rejected - it could be that some messages are processed and some of them need to be sent to an error queue for later reprocessing, for example. With current implementation we either acknowledge all messages or reject them as batch.

antonyvorontsov commented 4 years ago

Yeah, sure, I will try to figure that out. Thank you for the suggestions!

Bogoev91 commented 4 years ago

MassTransit actually have implemented something like this (from their documentation on : https://masstransit-project.com/advanced/batching.html)

cfg.ReceiveEndpoint("log-queue", e =>
{
    // the transport must be configured to deliver at least the batch message limit
    e.PrefetchCount = 200;

    e.Batch<LogMessage>(b =>
    {
        // allow up to 100 messages in a batch
        b.MessageLimit = 100;

        // end the batch early if at least one message has been received and the 
        // time limit is reached.
        b.TimeLimit = TimeSpan.FromSeconds(1);

        b.Consumer(() => new LogBatchConsumer());
    })
});

The downside is that the support provided is basic - you either acknowledge all messages in the batch or reject them (throw an error). Nevertheless, the configuration is pretty convenient.

antonyvorontsov commented 3 years ago

I have finally found time to get it done. I am really sorry for the late reply. I hope this thing is still relevant.

Check it out. https://github.com/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection/pull/54

Bogoev91 commented 3 years ago

Thanks for the reply, I actually tweaked MT a bit and got it working for me but still am happy to hear, as your library looked really simple and straightforward so it's good you got it working ;) Will keep it in mind.

Regards, Ivan

On Tue, Sep 29, 2020 at 5:27 AM Anton Vorontsov notifications@github.com wrote:

I have finally found time to get it done. I am really sorry for the late reply. I hope this thing is still relevant.

Check it out. #54 https://github.com/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection/pull/54

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection/issues/48#issuecomment-700387883, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKOXD3B42UYJKBVPORLDXK3SIFAZFANCNFSM4NQ7BDTQ .

antonyvorontsov commented 3 years ago

Drafted a new release v4.2.0 which contains this feature.

Covered this thing in the readme and documentation files, but will post here as well.

Basically you have to override a propertyMessageHandlingPeriod like this

public class YourBatchMessageHandler : BatchMessageHandler
{
    public YourBatchMessageHandler(
        IRabbitMqConnectionFactory rabbitMqConnectionFactory,
        IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions)
        : base(rabbitMqConnectionFactory, batchConsumerConnectionOptions, logger)
    {
    }

    public override ushort PrefetchCount { get; set; } = 50;

    public override string QueueName { get; set; } = "queue.name";

    public override TimeSpan? MessageHandlingPeriod { get; set; } = TimeSpan.FromMilliseconds(100);

    public override Task HandleMessages(IEnumerable<string> messages, CancellationToken cancellationToken)
    {
         // handle messages as you want.
    }
}

and it will fire HandleMessages as soon as MessageHandlingPeriod expires even if a batch of messages is unfilled.

Take a look at it if there is time.

Best regards, Antony