Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
401 stars 25 forks source link

Implement PipeBatchedAsync #25

Closed JKamsker closed 1 year ago

JKamsker commented 1 year ago

Please tell me your opinion on this. Do you know any way i could test this?

electricessence commented 1 year ago

I think what you are trying to do is exactly what the current batching does. The BatchingChannelReader works because it creates another channel to use as a buffer: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions/BufferingChannelReader.cs (base class for

You have a couple options from here. 1) You could write another BatchingChannelReader or modify the existing to allow for setting the bounds and possibly attempt to fill the channel before stopping. 2) Just use the .Pipe extensions to pull as many batches into another channel from that buffer.

But why? What benefit do you gain, when the BatchingChannelReader is as lazy as you can get with batching like this.

I think the easiest and most reliable solution is this:

var batchedReader = sourceChannelReader
    .Batch(100, singleReader: true, allowSynchronousContinuations: true)
    .Pipe(batch=>{ /* transform */ }, capacity: 10);

This will create batches of 100 entries and hold up to 10 batches at a time.

Is that what you were trying to do?

electricessence commented 1 year ago

Need more feedback to proceed.

JKamsker commented 1 year ago

My implementation allows:

None of your implementations currently support those usecases. Im not able to extend your code in such a way, maybe you can :)

My usecase:

electricessence commented 1 year ago

SelectMany is just a the opposite of a Batch which is a Join. You certainly can do everything your saying.

Here's a way you could do that for your scenario:

await Directory
    .EnumerateFiles(path)
    .ToChannel(1000)
    .Transform(fileName => new FileInfo(fileName))
    .PipeAsync(3, async fileInfo =>
    {
        var contents = await fileInfo.OpenText().ReadToEndAsync();
        var hash = HashFunction(contents);
        return (contents, hash);
    }, capacity: 100, singleReader: false)
    .Batch(20)
    .ReadAllConcurrentlyAsync(3, async batch =>
    {
        // From here it's all custom routing and distribution. **
    });

** Making more extensions to cover every use case doesn't make sense if you can get it done with the existing toolkit.

At my work, I recently developed an Async Messaging Toolkit that does have a .Group extension that will distribute to multiple targets, but in the end, you still need to create those targets, and have each target be smart enough to handle any faults.

Hope that helps, lemme know if I missed anything, or if there's something more I can do.