Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
401 stars 25 forks source link

ForceBatch method doesn't complete batch immediately #2

Closed alexeynikitin closed 4 years ago

alexeynikitin commented 4 years ago

I got the same behaviour described here with the .ForceBatch(): it doesn't complete batch immediately when called, but when the next item is written. I couldn't find tests of this method to refer how it is supposed to be used, so I decided it is wrong.

electricessence commented 4 years ago

Ok. I'll investigate.

electricessence commented 4 years ago

Did you experience this bug with 3.3.2?

electricessence commented 4 years ago

This test passes: https://github.com/electricessence/Open.ChannelExtensions/blob/master/Open.ChannelExtensions.Tests/BatchTests.cs#L112-L153

electricessence commented 4 years ago

Now that the other bug is fixed, you should be able to 'flush' the reader periodically with a timer/interval.

var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });

var reader = c.Reader.Batch(2);
Task.Run(async ()=>{
  await Task.Delay(1000);
  while(!reader.Completion.IsCompleted)
  {
    reader.ForceBatch();
    await Task.Delay(1000);
  }
});
await reader.ReadAllAsync(()=>{/*...*/});
alexeynikitin commented 4 years ago

Here's my example:

var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
var reader = c.Reader.Batch(3);
Task.Run(async () =>
{
    await Task.Delay(1000);
    c.Writer.TryWrite(1);
    c.Writer.TryWrite(2);
    c.Writer.TryWrite(3);
    c.Writer.TryWrite(4);
    c.Writer.TryWrite(5);

    await Task.Delay(3000);
    Console.WriteLine(reader.ForceBatch());
});

await reader.ReadAllAsync(async batch =>
{
    Console.WriteLine($"Batch: {string.Join(",", batch.Select(i => i.ToString()))}");
    await Task.Delay(500);
});
electricessence commented 4 years ago

K. Adding to tests.

electricessence commented 4 years ago

Thanks again for helping with this. This is a tricky one, that if the implementation was to buffer all incomming channel content to another channel then it probably wouldn't be an issue. I want to retain the BatchingChannelReader as a 'reader' that only pulls from it's source and subsequently batches on demand. As you might imagine, there could be unintended side effects if that wasn't the case.

The difference between your test and mine is that mine delays the read operation and yours doesn't. Yours reveals a situation that would only occur with .ForceBatch() where I'm 'awaiting' the source before proceeding but because the source doesn't have anything new, the await is stuck. What I was trying to avoid before was having to double await which typically requires a Task.WhenAny instead of simply awaiting a ValueTask.

Again, appreciate the help. I'm investigating further to find a solution.

alexeynikitin commented 4 years ago

I can suggest that BatchingChannelReader could actually work with "wrapped" items and `.ForceBatch' could post some predefined "command" item that cause completion of the batch. Then you don't need to mess with tasks.

electricessence commented 4 years ago

The problem is, when implementing the buffer differently, the code is quite simple and without any additional contention. But as soon as I add the .ForceBatch() method, everything gets way more complex and ultimately less performant because of contention.

Still working on it.

electricessence commented 4 years ago

Released an update. https://www.nuget.org/packages/Open.ChannelExtensions/3.4.0

I tried a few different methods. I'm not 100% happy with having to create Tasks under the hood, but it's actually not too terrible. It does work as expected. All tests are passing.

I avoided incurring the same overhead on the Join operation and implemented some minor improvements I should have already.

electricessence commented 4 years ago

@alexeynikitin Thanks again. Lemme know if there's anything else.

alexeynikitin commented 4 years ago

@electricessence Ok, thank you!