Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
401 stars 25 forks source link

Exception handling #28

Closed otvintil closed 9 months ago

otvintil commented 10 months ago

I like the library a lot and I would like to use it in my projects. However I do have a small issue. When there is an unhandled exception inside .Transform method then the execution simply hangs. For example:

var range = Enumerable.Range(0, 10000);
var pipe = range.ToChannel().Transform(i =>
{
    if ((i + 1) % 100 == 0) throw new Exception();
    return i.ToString();
});

var result = pipe.ReadAll(i => { });

Now I do realize that exception should be handled with try/catch inside the .Transform however I do believe it should not hang if something unexpected happens. But perhaps I'm doing something wrong.

electricessence commented 10 months ago

Note that .ReadAll(...) returns a ValueTask<long> and should be awaited. The name is just ReadAll because the lambda it accepts is synchronous. ReadAllAsync and TaskReadAllAsync also return a ValueTask<long> and should be awaited but they accept an async lambda where TaskReadAllAsync is expecting a Task as a return from the lambda and ReadAllAsync accepts a ValueTask which should work for 99% of all async lambdas.

Your code is not awaiting the .ReadAll(...). It's not a requirement as you could simply await the pipe.Complete but that's part of what ..ReadAll(...) is for is to write less code for when a channel will eventually close. Be sure to await somewhere or you won't see the exception.

I strongly recommend you trap any errors in your transform and you must return a value from it. Sometimes, I'll try catch and have the catch return null, or -1 to signify to the ReadAll that there was an error and just skip. You can also use the Filter method before ReadAll to eliminate those invalid values.

In short, here's how you would see the exception: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions.Tests/ExceptionTests.cs

And here's how I would handle transform exceptions:

await Enumerable
    .Range(0, 10000)
    .ToChannel()
    .Transform(i =>
    {
        try {
            if ((i + 1) % 100 == 0) throw new Exception();
            return i.ToString();
        }
        catch
        {
            // Log it? // Put it on another queue?
            return null;
        }
    })
    .Filter(iString => iString is not null)
    .ReadAll(iString =>
    {
         /* do stuff */
         Console.Write(iString);
    });
electricessence commented 10 months ago

Using a range to demo how this works is great. But real world behavior is a bit more interesting. Can I ask what you intend to do?

otvintil commented 9 months ago

Thank you, that makes total sense! I have a for each extension method which runs code concurrently and is using semaphores at the moment. I would like to run some benchmarks and see if channels are going to be faster and more optimized and perhaps switch to that. The use case is mostly lists of of API responses or database results which then would be processed concurrently.

electricessence commented 9 months ago

Channels are in essence a perfect async producer/consumer queue. That said, I strongly suggest that you use a "bounded" channel. Pick a number that seems okay, like 1000.

When you get into the weeds of what really happens, channels are amazing at helping optimize performance by deferring something for later, and then you can do things like 'batch' updates, or whatever you're thinking in another process/thread. But be warned, there is no magic and benchmarks don't necessarily reflect real-world results.

Here's that typically happens with Channels, given that you are not guaranteed a thread (TaskScheduler) will pick up the work at the tail until it's free to do so: Items tend to pile up at the head (inside the queue) because there typically isn't a second thread ready to go, then when there's a moment where the first thread is free (the task yields) it switches to the tail (or some other segment in the pipe, typically closer to the head) and begins processing what's available. Channels will also do synchronous work if they can and that may actually benefit the performance. Just remember that nothing is certain because of the chaotic nature of awaiting async code.

An alternative to Channels is DataFlow, and in some cases it can be a better way to go. DataFlow allows more finite control over TaskScheduling, and many other features. Although there are extensions here for concurrent reads, you might find DataFlow a bit better suited for something of that nature.

Final word is: A Channel is basically just a ConcurrentQueue with a special interface and it does tend to be very efficient and fast. If that's what you're looking for, then Channels it is.