louthy / language-ext

C# functional language extensions - a base class library for functional programming
MIT License
6.39k stars 414 forks source link

Somes() on IAsyncEnumerable<OptionAsync<T>>? #1363

Open marss72 opened 4 days ago

marss72 commented 4 days ago

The title describes my issue. I have an IAsyncEnumerable<OptionAsync<T>> and want to enumerate asynchronously or synchronously all Some items, so Somes() method with the synchronous IEnumerable.

Can I do it without using ToEnumerable()?

marss72 commented 4 days ago
.WhereAwait(async x => await x.IsSome)
.SelectAwait(async x => await x.Value);

These extension methods on IAsyncEnumerable might work. But is there a cleaner built-in way to do it?

louthy commented 2 days ago

Although not wrong, I am wondering why you have an asynchronous stream that yields asynchronous options?

Usually I'd expect either: IEnumerable<OptionAsync<A>> or IAsyncEnumerable<Option<A>>.

There wasn't much support for IAsyncEnumerable in v4 of language-ext and I won't be adding new functionality, so you'll need to add any extensions yourself.

I'd suggest something like:

public static class YourExtensions
{
    public static async IAsyncEnumerable<B> Choose<A, B>(this IAsyncEnumerable<A> ma, Func<A, Task<Option<B>>> f)
    {
        await foreach (var a in ma)
        {
            var r = await f(a);
            if (r.IsSome) yield return (B)r;
        }
    }

    public static IAsyncEnumerable<A> Somes<A>(this IAsyncEnumerable<OptionAsync<A>> ma) =>
        ma.Choose(a => a.ToOption());
}

Choose is a good function to avoid multiple evaluation as it both maps and filters.

marss72 commented 2 days ago

There wasn't much support for IAsyncEnumerable in v4 of language-ext and I won't be adding new functionality, so you'll need to add any extensions yourself.

Why don't we make a separate library having the support for it?

louthy commented 2 days ago

Why don't we make a separate library having the support for it?

You're welcome to, I have far too much on my plate to be supporting another library I'm afraid.

v5 has this functionality already, but it takes a radically different approach to dealing with async everything. For IAsyncEnumerable the expectation is that you'll wrap it up in a StreamT<M, A>. OptionAsync<A> doesn't exist either, it's replaced with OptionT<IO, A>.

So, for an asynchronous stream of optional values that have IO side-effects (Task), you would use the type: StreamT<OptionT<IO>, A>. It stacks streaming behaviours, optional behaviours, and IO side-effects into a single monadic type.

For example, below is the equivalent to IAsyncEnumerable<OptionAsync<int>>:

    static async IAsyncEnumerable<OptionT<IO, int>> asyncStream(int n) 
    {
        foreach (var x in Range(1, n))
        {
            var option = isOdd(x)
                             ? OptionT.lift(IO.pure(x))
                             : OptionT<IO, int>.None;

            // using FromResult to force it to async, not needed, just for this demo
            var r = await Task.FromResult(option);  
            yield return r;
        }
    }

    static bool isOdd(int x) =>
        (x & 1) == 1;

It yields Some when the values are odd and None when they're even.

The stream can be consumed like so:

static StreamT<OptionT<IO>, Unit> example(int n) =>
    from x in StreamT.liftM(asyncStream(n))
    from _ in Console.write($"{x} ")
    where false
    select unit;    

StreamT.liftM(asyncStream(n)) converts the IAsyncEnumerable to a StreamT<OptionT<IO>, int>.

Note how I'm able to combine the IO behaviour of Console.write directly in the stream expression.

In, the example above, the monads in the transformer-stack follow the rules of each stacked monad, and so the stream will end as soon as the first None value comes though.

So to get Somes-like behaviour, there are overloads when streaming: OptionT, Option, FinT, Fin, EitherT, Either, ValidationT, and Validation values, that drop the failed values.

If we rewrite the above as:

static StreamT<IO, Unit> example(int n) =>
    from x in asyncStream(n).Somes()
    from _ in Console.write($"{x} ")
    where false
    select unit;

Then that will just print out the odd numbers until the stream runs out.

Somes in this case converts to a StreamT

StreamT is much more powerful than IAsyncEnumerable, supporting complex folds and yields, merging, zipping, etc. It supports synchronous and asynchronous in the same type: removing the mismatch between async and sync types that causes so many compositional problems.

And because it's a monad-transformer it can be composed with any other transformers or monads to gain additional behaviours, something that's not really possible with IAsyncEnumerable.