Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
401 stars 25 forks source link

Question: Exception in Pipe breaks the channel? #9

Closed Feroks closed 4 years ago

Feroks commented 4 years ago

I would like to clarify if following behaviour is by design or not. Imagine that channel is created from AsyncEnumerable. If DoB() throws an exception, then all next produced values will only be handled in PipeAsync that calls DoA(). They will never reach PipeAsync with DoB() or ReadAllAsync()

await .....
.ToChannel()
.PipeAsync(async x =>
{
   return await DoA();
})
.PipeAsync(async x =>
{
   return await DoB();
})
.ReadAllAsync(x => 
{
   await Finish();
});

Am i supposed to try/catch all exceptions in Pipe methods and then return some faulty code that is handled by Filter method?

electricessence commented 4 years ago

This is a great question by which the subtleties of I have struggled greatly!

Keep in mind that with your code, await Finish() is called for every entry until there if a fault.

There are some tests to prove the policy, but basically:

  1. With no parameters specified, and no faults, DoA() will continue to fill up the channel that DoB() is reading from and is unaware of any downstream faults.
  2. If DoB() throws for any reason, the underlying read marks complete the subsequent writer with the exception thrown. https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions/Extensions.Pipe.cs#L105
  3. The 'faulted' downstream reader will then throw (out to the root statement) once it attempts to wait for any more items. https://github.com/Open-NET-Libraries/Open.ChannelExtensions/blob/master/Open.ChannelExtensions/Extensions.Read.cs#L108

So to summarize:

  1. Any faults should bubble out to the root and can be caught.
  2. With "Pipe" operations, upstream channels (and pipe calls) are unaware of downstream faults. It is possible to have multiple .Pipe calls on a single reader.
electricessence commented 4 years ago

Am i supposed to try/catch all exceptions in Pipe methods and then return some faulty code that is handled by Filter method? It depends.

The only real consequence is that the most upstream channel will continue iterating without being notified of a problem. This would also be true for Dataflow blocks.

Here's a pattern you could and probably should use in a complex situation like the one you provided:

using var ts = new CancellationTokenSource();
try
{
   await .....
   .ToChannel()
   .PipeAsync(async x =>
   {
      return await DoA();
   }, cancellationToken: ts.Token)
   .PipeAsync(async x =>
   {
      return await DoB();
   }, cancellationToken: ts.Token)
   .ReadAllAsync(x => 
   {
      await Finish();
   });
}
catch
{
  ts.Cancel(); // Any fault will halt (cancel) the above pipe operations.
  throw;
}

That way any downstream fault signals that the upstream reads/transforms are simply wasting processes.

electricessence commented 4 years ago

@Feroks lemme know if this answers your question or if you need more info. 😄

Feroks commented 4 years ago

@electricessence Thank you very much for detailed answer. It covers all my questions 🙂