Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
401 stars 25 forks source link

Exception handling in pipeline #21

Closed MardukGD closed 2 years ago

MardukGD commented 2 years ago

Im having issues with proper handling of exceptions thrown from pipeline steps. In some scenarios exception is being swallowed instead of being propagated to caller.

From my observations, it seems that it's somehow related to the .Batch() step, also moment of throwing exceptions may have some meaning.

Am I doing something wrong? How it should be properly handled to propagate exception up?

using System.Threading.Channels;
using Open.ChannelExtensions;

var test = new Test();
try
{
    //await test.Scenario1();   //exception catched
    //await test.Scenario2();   //exception swallowed
    //await test.Scenario3();   //exception catched
    //await test.Scenario4();   //exception sometimes catched (~25% chance)
}
catch (Exception)
{
    Console.WriteLine("Got exception");
}

class Test
{
public async Task Scenario1()
{
    var channel = Channel.CreateBounded<int>(10000);

    for (int i = 0; i < 100; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    var task = channel.Reader.Pipe(1, (element) =>
        {
            throw new Exception();
            Console.WriteLine(element);
            return 1;
        })
        .Pipe(2, (evt) =>
        {
            Console.WriteLine("\t" + evt);
            return evt * 2;
        })
        //.Batch(20)
        .PipeAsync(1, async (evt) =>
        {
            Console.WriteLine("\t\t" + evt);
            return Task.FromResult(evt);

        })
        .ReadAll(task =>
        {
        });

    channel.Writer.TryComplete();
    await task;

    Console.WriteLine("end");
}

public async Task Scenario2()
{
    var channel = Channel.CreateBounded<int>(10000);

    for (int i = 0; i < 100; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    var task = channel.Reader.Pipe(1, (element) =>
        {
            throw new Exception();
            Console.WriteLine(element);
            return 1;
        })
        .Pipe(2, (evt) =>
        {
            Console.WriteLine("\t" + evt);
            return evt * 2;
        })
        .Batch(20)
        .PipeAsync(1, async (evt) =>
        {
            Console.WriteLine("\t\t" + evt);
            return Task.FromResult(evt);

        })
        .ReadAll(task =>
        {
        });

    channel.Writer.TryComplete();
    await task;
}

public async Task Scenario3()
{
    var channel = Channel.CreateBounded<int>(10000);

    for (int i = 0; i < 100; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    var task = channel.Reader.Pipe(1, (element) =>
        {
            if(element == 20)
            throw new Exception();
            Console.WriteLine(element);
            return 1;
        })
        .Pipe(2, (evt) =>
        {
            Console.WriteLine("\t" + evt);
            return evt * 2;
        })
        //.Batch(20)
        .PipeAsync(1, async (evt) =>
        {
            Console.WriteLine("\t\t" + evt);
            return Task.FromResult(evt);

        })
        .ReadAll(task =>
        {
        });

    channel.Writer.TryComplete();
    await task;
}

public async Task Scenario4()
{
    var channel = Channel.CreateBounded<int>(10000);

    for (int i = 0; i < 100; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    var task = channel.Reader.Pipe(1, (element) =>
        {
            if (element == 20)
                throw new Exception();
            Console.WriteLine(element);
            return 1;
        })
        .Pipe(2, (evt) =>
        {
            Console.WriteLine("\t" + evt);
            return evt * 2;
        })
        .Batch(20)
        .PipeAsync(1, async (evt) =>
        {
            Console.WriteLine("\t\t" + evt);
            return Task.FromResult(evt);

        })
        .ReadAll(task =>
        {
        });

    channel.Writer.TryComplete();
    await task;
}
}
electricessence commented 2 years ago

I will look into this Thursday. :) Thanks for the report.

electricessence commented 2 years ago

Let's start with your use of .PipeAsync. Take a look:

.PipeAsync(1, async (evt) =>
{
    return Task.FromResult(evt);
})

The above is not appropriate. It's marking a task as async, and then you're returning a task as the type and not the value itself. That can definitely create some confusion. What is your true intention here?

electricessence commented 2 years ago

I'm writing some tests using your examples. Maybe your above intention was this:

.PipeAsync(1, evt => new ValueTask<int>(evt))
electricessence commented 2 years ago

So far in tests, your assumptions are correct, the exceptions are not propagated for batches but are when not batched.

electricessence commented 2 years ago

Ok so this is revealing. When the batch size matches the exception count, it does what it's supposed to, but if the batch size is a mismatch, the exception is not propagated correctly.

image

electricessence commented 2 years ago

So I'm formulating a theory.. It's generally because of how ReadAll works. It may actually complete even though you WILL have an exception in the channel.

electricessence commented 2 years ago

It's a troublesome issue in that when something is not batched, any valid results will be processed, and only when there is a fault is it not processed. But when batching, you still should allow for any valid items to be processed as a batch before causing a fault.

electricessence commented 2 years ago

Problem solved! Thank you so much for submitting! Was a pretty easy fix when I narrowed it down. https://www.nuget.org/packages/Open.ChannelExtensions/6.2.1 has the fix :)

The important thing I was worried about was that no items were lost even though they were batched. Valid (no-fault) entries should still be emitted, but then the fault should still propagate when the last request for more items is called for. This doesn't break that rule. This kind of bug would have only happened with the Pipe methods and a buffering reader. Very nuanced. Great find!

MardukGD commented 2 years ago

Great! I can confirm that it is working now correctly on my side :) Many thanks for quick fix!

electricessence commented 2 years ago

Most welcome!

electricessence commented 2 years ago

Remember to be careful about returning a Task from an async lambda.

electricessence commented 2 years ago

@MardukGD ... Just out of curiosity, what are you using channels for?

MardukGD commented 2 years ago

We are using this at work, we have built a pipeline which reads large files and extracts data from them which then are being processed with multiple steps.

electricessence commented 2 years ago

love it