Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
401 stars 25 forks source link

Batching with time limit to force data through on an interval #15

Closed R4ND3LL closed 2 years ago

R4ND3LL commented 2 years ago

I'm trying to determine if this project can help me construct a channel that does the following:

  1. Batch item reading using the standard .Batch() method, but...
  2. If nothing is written to the channel after X ms, the batch is pushed thru anyway even if it doesn't meet the size requirement.

That way the consumer efficiently processes batches of data, but is not permitted to get stale if the flow slows down.

Or simply batch item reading based on a time interval alone, without any regard for size? (i.e. read batches of items from the channel every 1 second assuming there is at least 1 item to read)

Any advice would be appreciated! Thanks

Note, I found this article that does exactly what I need, however it relies on an exception to pulse the read which is not great for tight intervals like 1 second or less. [https://stackoverflow.com/questions/63881607/how-to-read-remaining-items-in-channel-less-than-batch-size-if-there-is-no-new]

electricessence commented 2 years ago

This was discussed previously: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/issues/1

The solution I came up with for the time being is: .ForceBatch() so that you can write your own timeout logic. https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions/BatchingChannelReader.cs#L31-L52

electricessence commented 2 years ago

I'm open to suggestions if you feel there's another way.

R4ND3LL commented 2 years ago

I've looked at ForceBatch(), and arrived at the same comment that the other author mentioned: "How do I complete the batch if nothing has been written within x sec."

The issue being, ForceBatch() has to be triggered by something, but what we're looking for is to trigger the sending of a batch in the absense of activity. So this would require an external trigger to be coded via Timer or something.

Lets take this use case:

You have a cluster of servers with in-memory cache. You want to keep the cache state consistent between nodes using a channel to collect the changes (multi writer, single reader) that will transmit changes to the other nodes. Because of the overhead involved with sending packages over the network, it's necessary to batch the changes. If batches were size-based, you could easily have a situation where a batch is half filled, and then no more changes are generated for a few minutes, meaning the batch is not sent for a while and cache state becomes stale.

I would imagine that a time-interval based approach would work best where the batches are pulsed to other nodes every 500ms, for example. This would ensure both the network is not abused, and state never becomes stale.

I'm just trying to decide if the best approach to this "Pulsed Batch" is a Timer or a linked cancellation token with timeout, or some other option?

The linked token is nice and light weight, but I dont like introducing exceptions into the regular flow of the app. If the interval was tight like <= 50ms then this could get nasty. Perhaps my concern is unfounded?

Either way, it would be great if this was baked into a channel reader rather than introducing external code to manage this (and fire ForceBatch).

electricessence commented 2 years ago

Right. So ForceBatch() is there to do the trick. It just becomes a matter of how/when to trigger it. A CancellationTokenSource can only be cancelled once and has to be disposed. It's a bit tricky and hence why I created this: Open.Threading.Tasks: CancellableTask & ActionRunner

Now imagine that you are triggering a cancellation for every entry. (Additional overhead.) Probably not a good idea.

So let's consider how you might write this code externally...

  1. An oversimplified version could simply have a loop that does a Task.Delay(x) that once the channel is complete the loop exits. And for every iteration of that loop, you could call .ForceBatch(). Not ideal, as depending on your timing, you could be producing a lot of incomplete batches, but that might be fine.
  2. Next level could also use a tight async loop but could track the last batch ID (interlocked) coming from the reader and therefore decide if it's been too long since a batch has been released.

Considerations:

In either case, I feel like there are some nuances that individual consumers may or may not want.

Lemme know your thoughts. If I'm going to do this, it has to be right. I don't want to risk API changes in the future. I will definitely be thinking about this and if I come up with something I'll post here.

R4ND3LL commented 2 years ago

Here's the basic implementation using a linked cancellation token that expires:

        public static async ValueTask<T[]> ReadBatchAsync<T>(this ChannelReader<T> channelReader, int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default) {
            var items = new List<T>(batchSize);
            using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) {
                linkedCts.CancelAfter(timeout);
                while (true) {
                    T item;
                    try {
                        var token = items.Count == 0 ? cancellationToken : linkedCts.Token;
                        item = await channelReader.ReadAsync(token).ConfigureAwait(false);
                    } catch (OperationCanceledException) {
                        cancellationToken.ThrowIfCancellationRequested();
                        break; // The cancellation was induced by timeout (ignore it)
                    } catch (ChannelClosedException) {
                        if (items.Count == 0) throw;
                        break;
                    }
                    items.Add(item);
                    if (items.Count >= batchSize) break;
                }
            }
            return items.ToArray();
        }

This method (above) reads the individual items and creates batches manually. The "timer" starts when one item enters the list. So you are guaranteed another batch at the end of your interval. Adding more items does not reset the timer.

This is a heavy approach since it creates a new List and CTS for each batch, as well as using exceptions in the regular flow. Not a fan of this one.

And here is my implementation using your Batch extension and a timer AND IAsyncEnumerable so that everything can be reused:

        public static async IAsyncEnumerable<IList<T>> ReadBatchEnumerableAsyncUsingTimer<T>(this ChannelReader<T> channelReader, int batchSize, TimeSpan timeout, [EnumeratorCancellation] CancellationToken cancellationToken = default) {
            var reader = channelReader.Batch(batchSize);
            var timer = new Timer(TimerCallback, null, timeout, timeout);
            try {
                while (true) {
                    List<T> item = null;
                    try {
                        item = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
                    } catch (OperationCanceledException) {
                        cancellationToken.ThrowIfCancellationRequested();
                        yield break;
                    } catch (ChannelClosedException) {
                        yield break;
                    }

                    if (item?.Count > 0) yield return item;
                }
            } finally {
                await timer.DisposeAsync();
            }

            void TimerCallback(object? state) {
                reader.ForceBatch();
            }
        }

This is a nicer approach due to the elimination of the routine exception from the flow, and the CTS uses a timer internally so I expect this is just as light.

The downside here is that the reader has no knowledge of items being written, so it has to periodically call ForceBatch() whether anything is in the channel or not.

I expect if the channel had this baked in, it could disable/reset the timer as necessary.

I was generally assuming the timer would start from the first item added to a batch, but I see merit in having the timer reset upon each item collected. However, like you said, its possible for a series of additions to conveniently reset the timer just before expiry, which would cause the channel to produce nothing for a long time. You would end up back at square one with the receiver becoming stale again. So I would think starting the timer on initial item only would be the primary focus of this feature. I cant really think of a real world use case for resetting on every item.

Btw, I see users sometimes using your extension like this

   .Batch(Int32.MaxValue, TimeSpan.FromSeconds(1))

which would essentially give you a heartbeat feed of data, every 1 second, with no regard for size.

electricessence commented 2 years ago

I appreciate the suggestions! I'm imagining a hybrid of the two. I think your first example doesn't do exactly what you think it does, but I get the intention...

   .Batch(Int32.MaxValue, TimeSpan.FromSeconds(1))

This is a really great example. I will keep this in mind.

electricessence commented 2 years ago

BTW, you can put ```cs when you use your code blocks and it will color code it.

electricessence commented 2 years ago

So at a minimum... I think the acceptance criteria here is: If a batch has not been emitted by the timeout, then call .ForceBatch().

electricessence commented 2 years ago

Ask, and you shall receive: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/releases/tag/v5.1.0

Published: https://www.nuget.org/packages/Open.ChannelExtensions/

Tests are passing the behavior is as expected. Let me know if you run into any issues.

electricessence commented 2 years ago

@R4ND3LL ... An important note why this took so long to work out... It was my intention that any buffered channel readers like .Join() and .Batch() shouldn't automatically fill up thier internal channel. They should react to being read from. This was extremely tricky to get right as you had to react appropriately to both being written to and be read from in order to ensure that the source channel isn't drained and subsequently resists being written to if it's a bounded channel.

Here's how it was (and is) done: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/f6ea5bb51e06a3f090ff9a0d85e1775759f7ad84/Open.ChannelExtensions/BatchingChannelReader.cs#L180-L217

R4ND3LL commented 2 years ago

Thanks! It'll be Tuesday before I can look at this further - I'll wire it into my unit tests and see what happens.

By the look of it, you can alter the timeout at any time, which is great. I was thinking about making the channel respond to environmental load, throttling lower priority channels, so this should work well.

electricessence commented 2 years ago

Correct, you can alter the timeout at any time. Interesting approach for throttling. Lemme know how it goes! Just remember that setting the batch size will reserve memory. So int.MaxValue is probably not a good idea.

R4ND3LL commented 2 years ago

I'm getting a stack overflow on the WithTimeout() call. Here is my full testcase:

        [Fact]
        public static async Task ReadBatchWithTimeoutEnumerableBakedIn() {
            var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
            _ = Task.Run(async () => {
                //await Task.Delay(1000);
                c.Writer.TryWrite(1);
                c.Writer.TryWrite(2);

                c.Writer.TryWrite(3);
                await Task.Delay(600);

                c.Writer.TryWrite(4);
                c.Writer.TryWrite(5);
                Debug.WriteLine("Writing Complete.");
                c.Writer.Complete();
            });
            var i = 0;
            await foreach (var batch in c.Reader.ReadBatchEnumerableAsyncBakedIn(2, TimeSpan.FromMilliseconds(500), CancellationToken.None)) {
                switch (i) {
                    case 0:
                        Assert.Equal(1, batch[0]);
                        Assert.Equal(2, batch[1]);
                        Debug.WriteLine("First batch received: " + String.Join(',', batch.Select(item => item)));
                        break;
                    case 1:
                        Assert.Equal(1, batch.Count);
                        Assert.Equal(3, batch[0]);
                        Debug.WriteLine("Second batch received: " + String.Join(',', batch.Select(item => item)));
                        break;
                    case 2:
                        Assert.Equal(4, batch[0]);
                        Assert.Equal(5, batch[1]);
                        Debug.WriteLine("Third batch received: " + String.Join(',', batch.Select(item => item)));
                        break;
                    default:
                        throw new Exception("Shouldn't arrive here. Got batch: " + String.Join(',', batch.Select(item => item)));
                }
                i++;
            }
            i.ShouldBe(3);
            await c.Reader.Completion; // Propagate possible failure
        }
        public static async IAsyncEnumerable<IList<T>> ReadBatchEnumerableAsyncBakedIn<T>(this ChannelReader<T> channelReader, int batchSize, TimeSpan timeout, [EnumeratorCancellation] CancellationToken cancellationToken = default) {
            var reader = channelReader.Batch(batchSize);
            reader = reader.WithTimeout(timeout); // stack overflow here
            while (true) {
                List<T> item;
                try {
                    item = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
                } catch (OperationCanceledException) {
                    cancellationToken.ThrowIfCancellationRequested();
                    yield break;
                } catch (ChannelClosedException) {
                    yield break;
                }

                if (item?.Count > 0) yield return item;
            }
        }
electricessence commented 2 years ago

Ok looking into it.

electricessence commented 2 years ago

Fix inbound...

electricessence commented 2 years ago

Fixed: https://www.nuget.org/packages/Open.ChannelExtensions/5.1.3 Would have worked if you used milliseconds. Good thing you didn't. :)