Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
416 stars 25 forks source link

Flush BatchingChannelReader on Graceful shutdown (Background service) #44

Closed ArminShoeibi closed 2 months ago

ArminShoeibi commented 2 months ago

Hi there,

We are using Open.ChannelExtensions in our project especially for Batching BatchingChannelReader, I have checked the source code for Flushing when cancellationToken signals the shutdown but found nothing.

this is our code

public sealed class RequestToRefundV2ProducerBackgroundService(IProducerAccessor producerAccessor, Channel<RequestToRefundV2> channel) : BackgroundService
{
    private IMessageProducer _producer = producerAccessor.GetProducer(Constants.REFUND_REQUEST_PRODUCER);
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await channel.Reader.Batch(10, singleReader: true)
                            .WithTimeout(TimeSpan.FromSeconds(90))
                            .ReadAllAsync(stoppingToken, async (requests) =>
        {
            foreach (var item in requests)
            {
                try
                {
                    await _producer.ProduceAsync(item.ReferenceId, item);
                }
                catch
                {
                }
            }
        });
    }
}

How can we flush if the batch count does not meet the condition plus the provided timeout?

electricessence commented 2 months ago

Hello!

I'm not sure from your description what you're expecting?

All that .ReadAllAsync is doing is reading batches from the channel until the cancellation token is cancelled or the Channel is "complete" (closed). Simply put, the difference between .ReadAllUntilCancelled and .ReadAllAsync, is that .ReadAllAsync throws an exception when cancelled: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions/Extensions.Read.cs#L336-L344

.ReadAllAsync does not control or manage the channel, it simply reads from it.

If you are expecting a graceful "shutdown" or "closure" of the channel, cancelling the read operation is not the way to do it. In that case, it's simply a "bail out" and leave the data behind operation.

If you are concerned about lost data/messages, you should retain the batching reader and only read from it. If the operation is cancelled, you should consider closing the channel to allow for all data to finish being read, or in the case of your code, don't create a new batching reader every time that is called.

Consider changing your code signature to take a ChannelReader<List<RequestToRefundV2>> reader instead of a channel to ensure that no data is lost. Then retain that reader and reuse every time until the channel is closed.

ArminShoeibi commented 2 months ago

Hello!

I'm not sure from your description what you're expecting?

All that .ReadAllAsync is doing is reading batches from the channel until the cancellation token is cancelled or the Channel is "complete" (closed). Simply put, the difference between .ReadAllUntilCancelled and .ReadAllAsync, is that .ReadAllAsync throws an exception when cancelled: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions/Extensions.Read.cs#L336-L344

.ReadAllAsync does not control or manage the channel, it simply reads from it.

If you are expecting a graceful "shutdown" or "closure" of the channel, cancelling the read operation is not the way to do it. In that case, it's simply a "bail out" and leave the data behind operation.

If you are concerned about lost data/messages, you should retain the batching reader and only read from it. If the operation is cancelled, you should consider closing the channel to allow for all data to finish being read, or in the case of your code, don't create a new batching reader every time that is called.

Consider changing your code signature to take a ChannelReader<List<RequestToRefundV2>> reader instead of a channel to ensure that no data is lost. Then retain that reader and reuse every time until the channel is closed.

Thanks for your reply.

please see this pic. image

When a SIGTERM signal is received (e.g., from Kubernetes, Docker, or OpenShift) to gracefully shut down a pod with a 5-minute grace period, background services using cancellation tokens are notified. If the batch size is not met (e.g., fewer than 10 items) and the timeout is not reached, I need to manually flush the channel to process the remaining data.

Just to clarify, I'm not creating a new BatchingChannelReader every time. The BackgroundService is a singleton, and there's no while loop involved.

electricessence commented 2 months ago

Ah.. Ok, I see the background service pattern now.

Let me take another look. Meanwhile, I'd have you consider that instead of just "aborting", when a cancellation token from the host is cancelled, consider closing the channel and finishing gracefully that way. Just cancelling the read operation is definitely sub-optimal and has a high likelihood of lost messages.

I have a similar arrangement in a background service, I'll have to take a look, but it's quite possible that these extensions won't perfectly do what you need.

electricessence commented 2 months ago

You'll need to do something like this:

Step 1:

Remove the cancellationToken from the .ReadAllAsync() call.

Step 2:

// React to cancellation appropriately.
cancellationToken.Register(() => channel.Writer.TryComplete());

The buffering/batching channel reader will effectively call a .ForceBatch() when the source channel is complete/empty: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions/Readers/BufferingChannelReader.cs#L47C1-L47C25

electricessence commented 2 months ago

@ArminShoeibi. I'll close for now. We can reopen if you have any other concerns or simply continue to comment.