Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
416 stars 25 forks source link

Consider flush batch reader by timeout as well #1

Closed alexeynikitin closed 4 years ago

alexeynikitin commented 4 years ago

Hello! I found your extensions library quite useful and expressive! I also think that BatchingChannelReader is lacking of feature to flush its buffer by timeout as well. Could you please add it?

electricessence commented 4 years ago

Interesting... Looking into it.

electricessence commented 4 years ago

I'm not sure exactly what you're asking or how you'd use it? Adding a timeout may cause unexpected behavior. Mind making a PR?

alexeynikitin commented 4 years ago

I mean that .Batch behaves like .Buffer in Rx: you can either set count or timespan to trigger by: http://introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html

electricessence commented 4 years ago

I would disagree that it behaves like buffer. A 'buffer' is exactly what a channel is. A 'Batch' is more like the implementation of MoreLINQ: https://morelinq.github.io/3.3/ref/api/html/Overload_MoreLinq_MoreEnumerable_Batch.htm Channels are not Observables. The BatchChannelReader is assuming a pull operation that 'sizes' the batches on demand as you retrieve the results. Because it's a 'reader' it is not in control of when the writer is complete.

That said, I can imagine a special method that gets the current batch regardless of size. I'll look into it again.

electricessence commented 4 years ago

Ok definitely going to have something. Hang tight.

electricessence commented 4 years ago

A patch has been made: https://www.nuget.org/packages/Open.ChannelExtensions/ (3.3.1) I recognized a serious flaw where if the channel never calls complete, the reader has no way of getting partial batches from the channel. This gives the power to the consumer to force buffering partial batches.

The solution is an added method called .ForceBatch().

This may not be totally what you're looking for but this is something I knew right away was a good idea, and did not create unintended side effects. Adding timers for flushing is another beast that I may be out of scope for this project since the BatchChannelReader<T> is the only class susceptible to this.

Leaving this open if you have a suggestion on how to do this properly.

alexeynikitin commented 4 years ago

@electricessence Nothing comes to mind so far. .ForceBatch() seems reasonable, but I can't figure out how do I "complete the batch if nothing has been written within x sec." using concurrent batch reading.

I also do not know whether the behavior of BatchChannelReader<T> to complete the previous batch only as it passed item of next batch was intended.

In other words I'm expected the following code should produce lines Batch: 1,2 Batch: 3,4 Batch: 5,6 but it is Batch: 1,2 Batch: 3,4 , despite of the reader have enough items to complete batch

var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions {SingleReader = false, SingleWriter = false});
Task.Run(async () =>
{
    await Task.Delay(2000);
    c.Writer.TryWrite(1);
    c.Writer.TryWrite(2);
    c.Writer.TryWrite(3);
    c.Writer.TryWrite(4);
    c.Writer.TryWrite(5);
    c.Writer.TryWrite(6);
});

var readerTask = c.Reader
    .Batch(2)
    .ReadAllAsync(async batch =>
    {
        Console.WriteLine($"Batch: {string.Join(",", batch.Select(i => i.ToString()))}");
        await Task.Delay(1000);
    });

await readerTask;
electricessence commented 4 years ago

Investigating.

electricessence commented 4 years ago

Fixed: https://github.com/electricessence/Open.ChannelExtensions/commit/a8dff0e010940288f5b92e80e7b33e97c2d04545#diff-5688a0077f6ab44e128e5993d57cb6c8L78

Originally this was thought of as a pull operation and was trying to optimize for that. It's now a little more forgiving. ;)

electricessence commented 4 years ago

Released: https://www.nuget.org/packages/Open.ChannelExtensions/3.3.2

Submit any other requests as a new ticket. :)

electricessence commented 3 years ago

@alexeynikitin https://github.com/Open-NET-Libraries/Open.ChannelExtensions/issues/15#issuecomment-938324729