dotnet / roslyn

The Roslyn .NET compiler provides C# and Visual Basic languages with rich code analysis APIs.
https://docs.microsoft.com/dotnet/csharp/roslyn-sdk/
MIT License
18.86k stars 4.01k forks source link

Proposal: language support for async sequences #261

Closed gafter closed 7 years ago

gafter commented 9 years ago

Both C# and VB have support for iterator methods and for async methods, but no support for a method that is both an iterator and async. Its return type would presumably be something like IObservable<T> or IAsyncEnumerable<T> (which would be like like IEnumerable but with a MoveNext returning Task<bool>).

This issue is a placeholder for a feature that needs design work.

HaloFour commented 9 years ago

I'd personally would love to see support for both IObservable<T> and IAsyncEnumerable<T>. The former interface at least already exists in the BCL and there is fantastic support for it in Rx. The latter interface has already been defined in the sister-project Ix (and under System.Collections.Generic no less) so would this language feature involve taking a dependency on that project or duplicating that interface in the BCL?

Ultimately switching between the two would be pretty easy (and already supported in Rx/Ix), but from a sequence producer point of view they would behave a little differently in that yield return for IObservable<T> would likely continue executing immediately whereas for IAsyncEnumerable<T> it would wait until the next invocation of MoveNext().

Also, if considering support for IObservable<T> you might want to consider requiring that the generator method accept a CancellationToken which would indicate when the subscriber has unsubscribed.

From the consumer point of view they should probably behave the same way. Observable.ForEach allows the action to execute concurrently and I think that it would probably be pretty unintuitive to allow the foreach body to have multiple concurrent threads (assuming that they're not being dispatched through a SynchronizationContext). If the implementation is similar to how await works whatever intermediary (SequenceAwaiter, etc.) could handle the details of buffering the results from an IObservable<T> or an extension method could just turn it into an IAsyncEnumerable<T>.

scalablecory commented 9 years ago

@HaloFour Observable.Create already provides an optimal implementation of this that language extensions wouldn't add any value to.

IAsyncEnumerable, however, has no optimal way to generate a sequence other than implementing the interface manually. It's fairly easy to make something that emulates yield return but it is super inefficient so this is badly needed.

HaloFour commented 9 years ago

I don't disagree. Rx is awesome like that. I advocate for it mostly to bring Rx closer to the BCL so that people are more aware that it exists, and also because those core interfaces are at least a part of the BCL whereas IAsyncEnumerable<T> is brand new to the BCL (and duplicates Ix).

MgSam commented 9 years ago

I'm not familiar with Ix, so I can't comment on any existing IAsyncEnumerable, but I would rather the team start fresh when thinking about async enumerables rather than try to build off IObservable. Rx was an interesting project, but it was designed mostly before async existed and then later tried to bolt the two concepts together with varying success. Present-day Rx also has a very cluttered API surface area with poor documentation all around.

async/await enables code that looks almost identical to synchronous code- I'd like to be able to work with asynchronous sequences as effortlessly as you can work with IEnumerable today. I've definitely wanted to mix yield return and async/await before so this is a feature that would be very helpful.

HaloFour commented 9 years ago

Indeed, there is a lot of duplication between the two because they were developed independently and Rx never had the resources that BCL/PFX had. I also don't think that Rx/Ix could be merged into the BCL as is.

The Ix IAsyncEnumerable<T> interface is exactly as described here, basically identical to IEnumerable<T> except that MoveNext() returns Task<bool>. As mentioned the big difference between something like IObservable<T> and IAsyncEnumerable<T> is that the latter is still a pull-model as the generator really couldn't continue until the consumer called MoveNext() again. In my opinion this would make it less suitable for certain concurrent processing scenarios since the producer code isn't running between each iteration. An IObservable<T> async iterator could continue executing immediately after yielding a value.

In my opinion supporting both would be worthwhile. The compiler could generate different state machines depending on the return type of the async iterator.

thomaslevesque commented 9 years ago

I've been wishing for this feature ever since C# 5 came out. Being able to write something like yield return await FooAsync() would be very useful; currently when I have an async method that returns a collection, I just return a Task<IReadOnlyCollection<T>>, because implementing lazyness has too much overhead.

I noticed that Roslyn already has an IAsyncEnumerable<T> interface here. That's pretty much the design I had in mind, although I had forgotten about cancellation. To make it really useful, we would also need an async version of foreach (including a way to pass a CancellationToken to MoveNextAsync).

paulomorgado commented 9 years ago

@thomaslevesque, the Roslyn link is 404.

thomaslevesque commented 9 years ago

@thomaslevesque, the Roslyn link is 404.

Uh... looks like it's no longer there. A search for IAsyncEnumerable returns nothing (the name only appears in a comment). Perhaps it was moved and renamed to something else, or it was just removed.

anpete commented 9 years ago

Entity Framework uses the IAsyncEnumerable pattern to enable async database queries. In EF6 we had our own version of the interface, but in EF7 we have taken a dependency on IX-Async.

HaloFour commented 9 years ago

@anpete Seems to me that if async streams depends specifically on a new BCL IAsyncEnumerable<T> interface that not only will it not be a very usable feature until more projects more to the newer frameworks but there will also be a lot of confusion between the different-yet-identical interfaces that already exist.

Perhaps the compiler could support the different interfaces by convention, or have an easy way to unify them through a common utility extension method. But if, for whatever reason, they need to be converted back to their proper specific interface that would still pose problems.

I believe quite strongly that not at least trying to integrate the BCL and the Roslyn languages with Rx/Ix is a massive wasted opportunity.

tpetricek commented 9 years ago

Just to provide some additional background, this can already be done in F# (because F# "computation expressions", which is a mechanism behind both iterators and asyncs is flexible enough). So, the C# design might learn something useful from the F# approach to this. See:

Probably the most interesting consideration here is what is the programming model:

You can convert between the two, but going from Rx to AsyncSeq is tricky (you can either drop values when the caller is not accepting them, or cache values and produce them later).

The thing that makes AsyncSeq nicer from sequential programming perspective (i.e. when you write statement-based method) is that it works well with things like for loops. Consider:

asyncSeq { 
  for x in someAsyncSeqSource do
    do! Async.Sleep(1000)
    processValue x }

Here, we wait 1 second before consuming the next value from someAsyncSeqSource. This works nicely with the pull-mode (we just ask for the next value after 1 second waiting), but it would be really odd to do this based on Rx (are you going to start the loop body multiple times in parallel? or cache? or drop values?)

So, I think that if C# gets something like asynchronous sequences (mixing iterators and await), the pull-based design that is used by F# asyncSeq is a lot more sensible. Rx works much better when you use it through LINQ-style queries.

EDIT: (partly in reply to @HaloFour's comment below) - I think that it makes sense to support the async iterator syntax for IAsyncEnumerable<T> (*), but not for IObservable<T>, because you would end up with very odd behavior of foreach containing await on Task<T>.


(*) As a side-note, I find IAsyncEnumerable<T> quite odd because it lets you call MoveNext repeatedly without waiting for the completion of the first - this is probably never desirable (and AsyncSeq<T> in F# does not make that possible).

HaloFour commented 9 years ago

@tpetricek The difference in behavior between IAsyncEnumerable<T> and IObservable<T> is exactly why I think async iterators should support both, it gives the programmer the capacity to decide whether it's a push- or pull-model and abstracts the difference to the consumer. I think a lot of scenarios benefit from a push-model, such as launching a bunch of operations simultaneously and wanting to process the results as they are available.

Beyond that hopefully both interfaces will enjoy support of all of the common LINQ operators plus those operators that apply to asynchronous streams.

dsyme commented 9 years ago

@tpetricek - The FSharp.Control.AsyncSeq documentation has been clarified to use the terminology "asynchronous pull", rather than just "pull", i.e. a pull operation that returns asynchronously, Async<T>. I'll leave it to others to debate what exactly the difference is between an "asynchronous pull" and a "synchronous push" :)

radekm commented 9 years ago

It would be nice if the reading from async sequence had constant stack usage and simple associative operations like concatenation had decent performance no matter whether left- or right-associated. Eg. reading from IEnumerables constructed by following functions

        static IEnumerable<int> LeftAssocEnum(int i)
        {
            var acc = Enumerable.Empty<int>();
            while (i > 0)
            {
                acc = Enumerable.Concat(acc, new int[] { i });
                i--;
            }
            return acc;
        }

        static IEnumerable<int> RightAssocEnum(int i)
        {
            var acc = Enumerable.Empty<int>();
            while (i > 0)
            {
                acc = Enumerable.Concat(new int[] { i }, acc);
                i--;
            }
            return acc;
        }

causes StackOverflowException for sufficiently large i and both IEnumerables have quadratic complexity.

vladd commented 9 years ago

@radekm For your kind of usage (sequence is materialized, size is known in advance) you can already use List<int>.

    static IEnumerable<int> LeftAssocEnum(int i)
    {
        var acc = new List<int>(i);
        while (i > 0)
        {
            acc.Add(i);
            i--;
        }
        return acc;
    }

Does your request mean that all possible implementations of IEnumerable<T> (including immutable and lazy ones) should behave like List<T>?

radekm commented 9 years ago

@vladd It was only a simple example, you can take for instance Fib()

        static IEnumerable<BigInteger> Fib()
        {
            return Fib(BigInteger.Zero, BigInteger.One);
        }

        static IEnumerable<BigInteger> Fib(BigInteger a, BigInteger b)
        {
            yield return a;
            foreach (var x in Fib(b, a + b))
            {
                yield return x;
            }
        }

which has the same problems. What I want is to compose complex asynchronous sequences from very simple and reusable parts. To do this the operators like concatenation must be efficient. Since I don't know how to do this in C# I'll give a few examples in Scala with scalaz-stream.

1) Recursion can be used to define streams:

def fib(a: BigInt = 0, b: BigInt = 1): Process[Nothing, BigInt] =
    emit(a) ++ fib(b, a + b)

There is no risk of stack overflow and reading the first n items takes O(n) not O(n^2) (assuming that a + b is computed in constant time which is not true). Note: fib(b, a + b) is passed by name so the above code terminates.

2) Even transformations of streams are easily composable:

process1.take[Int](5).filter(_ > 0) ++ process1.id

This applies the filter only to the first 5 integers of the stream. You can use it with operator |>

Process(1, 2, -3, -4, -5, -6, -7) |> (process1.take[Int](5).filter(_ > 0) ++ process1.id)

and it gives you 1, 2, -6, -7.

LeeCampbell commented 9 years ago

I think that it would be wise to have language parity with F# for supporting async pull sequences (e.g. IAsyncEnumerble<T>, AsyncSeq<'T>). @tpetricek and @dsyme make very valid points here and the links are excellent and well worth reading as there appears to be confusion between when it is appropriate to use async pull vs IObservable<T>.

That leads me on to making some comments about Rx and why I dont think it needs any language support (right now).

  1. IObservable<T> is in the BCL. Fine, so people know about it.
  2. Being a library, it can have a faster release cadence than the language. This has been particularly positive for the adoption and improvement of the library. As we speak Rx 3.0 is in development, and it may have breaking changes. Let's not mix libraries with languages. You can also see this now happening at the framework level.
  3. Yup we need better doc's and education. I tried my best to do my part IntroToRx.com

@thomaslevesque says "I've been wishing for this feature ever since C# 5 came out.". It seems to me that his example is a great candidate for Rx (async, lazy and support for cancellation).

@HaloFour "Observable.ForEach" shudder. Please don't use this method. It needs to be removed. It has no cancellation support, nor does it have any error handling/OnError

HaloFour commented 9 years ago

@LeeCampbell I'd largely be happy if the C# team did the same thing they did with await and provided a pattern that could be used to describe anything as an asynchronous stream. Then Rx could easily support that pattern, probably through an extension method that would describe the correct back-pressure behavior.

I think that there is a massive amount of information for Rx out there, but if nobody knows to look it might as well not exist. I think that it needs the same kind of campaign from MS that LINQ and asynchrony received. Some kind of inclusion in the languages pushes that point. I've been doing a lot of Java dev lately and it annoys me how much excitement there seems to be building around Rx that I don't see on the .NET side.

LeeCampbell commented 9 years ago

I am interested to see how you would see this work. I think the way you work with and AsyncEnum and the way you work with an IObservable sequence are quite different. The former you poll and pull from until complete and then you move on to the next statement.

IAsyncEnumerable<int> sequence = CreateAsynEnumSeq();
Output.WriteLine("Awaiting");
await sequence.ForEachAsync(Output.WriteLine);
Output.WriteLine("Done");

The later you set up a subscription providing call backs and then move on immediately. The callbacks for an Observable sequence are called at some future point in time.

IObservable<int> sequence = CreateObservableSeq();
Output.WriteLine("Subscribing");
sequence.Subscribe(Output.WriteLine, ()=>Output.WriteLine("Done"));
Output.WriteLine("Only Subscribed to, but not necessarily done.");

With this in mind, they (to me at least) are totally different things, so I am not sure why or how language support would help here. Would like to see a sample of your ideas. I can see some usefulness for language support of AsynEnum sequences, again, at least to get language parity with F#

HaloFour commented 9 years ago

@LeeCampbell

To give you an idea, I already currently have an extension method for IObservable<T> called GetAsyncEnumerator which returns my own IAsyncEnumerator<T> implementation:

public IObservable<int> Range(int start, int count, int delay) {
    return Observable.Create(async observer => {
        for (int i = 0; i < count; i++) {
            await Task.Delay(delay);
            observer.OnNext(i + start);
        }
    });
}

public async Task TestRx() {
    Random random = new Random();
    IObservable<int> observable = Range(0, 20, 1000);
    using (IAsyncEnumerator<int> enumerator = observable.GetAsyncEnumerator()) {
        while (await enumerator.MoveNextAsync()) {
            Console.WriteLine(enumerator.Current);
            await Task.Delay(random.Next(0, 2000));
        }
    }
}

There are several overloads to GetAsyncEnumerator depending on if/how you want to buffer the observed values. By default it creates an unbounded ConcurrentQueue into which the observed values are collected and MoveNextAsync polls for the next value available in that queue. The other three options are to use a bounded queue, to have IObservable<T>.OnNext block until there is a corresponding call to MoveNextAsync or to have IObservable<T>.OnNext not block but have MoveNextAsync return the latest available value, if there is one. There are also overloads that accept a CancellationToken, of course, and IAsyncEnumerator<T>.Dispose unsubscribes the observer.

I hope that kind of answers your question. It's early and I didn't get much sleep last night. Basically, I am treating the IObservable<T> as an IAsyncEnumerable<T> and bridging between the two isn't all that difficult. The big difference is that the observable can continue to emit values and not have to wait for someone to poll.

scalablecory commented 9 years ago

Guys who are interested in IObservable support -- can you describe the benefit integrating this into the language would bring?

HaloFour commented 9 years ago

@scalablecory

  1. The interface is already in the BCL and has been since .NET 3.5.
  2. Rx already provides an amazing degree of support for working with IObservable<T> and marries the existing language concepts of asynchrony with LINQ beautifully.
  3. I think that "push" model asynchronous enumeration is very useful for a lot of scenarios, specifically when you need to dispatch a series of asynchronous requests at once and then process them as the results become available. This is currently pretty obnoxious to handle with async methods alone.
  4. Volta needs MS love.

To Devil's Advocate my own arguments:

  1. Probably moot as we'd likely need a streaming analog to GetAwaiter so we're stuck waiting on a BCL change anyway.
  2. Someone's bound to write all of the same LINQ methods to work against IAsyncEnumerable<T>, despite being a pretty massive duplication of effort. Rx already has, it would be silly to do it again.
  3. I'm sure that IAsyncEnumerable<T> can wrap a "push" notification source. I'm already doing it.
  4. Java clearly loves Volta more. :wink:

Now, given the probability of Devil's Advocate point 1, some streaming analog to GetAwaiter, support for IObservable<T> from the consuming side could be accomplished by extension methods within Rx, and I'd be perfectly happy with that.

Now, for my arguments from the generating side, I'd like to revisit my use case of dispatching a bunch of asynchronous operations. This is something that the current project I work on does incredibly frequently, basically n+1 operations against web services where the first response provides a bunch of IDs that then need to be resolved individually*. If async streams return IAsyncEnumerable<T> where the coroutine isn't continued until the consumer asks for the next value then you don't really have the facility to perform the operations in parallel.

public async IAsyncEnumerable<User> QueryUsers(int organizationId, CancellationToken ct) {
    Organization organization = await ws.GetOrganization(organizationId, ct);
    foreach (int userId in organization.UserIds) {
        User user = await ws.GetUser(userId);
        yield return user; // can't continue until consumer calls IAsyncEnumerator.MoveNext
    }
}

Granted, there could be BCL methods to make this a little easier, but it feels like something that can be supported out of the box:

public async IObservable<User> QueryUsers(int organizationId, CancellationToken ct) {
    Organization organization = await ws.GetOrganization(organizationId, ct);
    foreach (int userId in organization.UserIds) {
        User user = await ws.GetUser(userId);
        yield return user; // Effectively the same as calling IObserver.OnNext(user)
    }
}
svick commented 9 years ago

@HaloFour Just like you can currently decide whether to process IEnumerable<T> in series (foreach) or in parallel (Parallel.ForEach()), there could be a similar distinction for IAsyncEnumerable<T>; you don't need IObservable<T> for that.

The problem I have with IObservable<T> is that it's pretty much impossible to process it in series without either blocking the producer or using some kind of buffer.

scalablecory commented 9 years ago

@HaloFour Let me rephrase my question.

Putting aside the "push" vs "pull" or "Rx" vs "IAsyncEnumerable" debate, for this proposal to gain weight, it needs to show solid benefits for language integration. These benefits have not yet been shown for the Rx side of things.

My two cents is that language integration wouldn't provide a significant benefit over Observable.Create. Its sole purpose would be to provide feature parity with "yield return", which I don't think is a good reason. Bringing popularity to Rx is also not a good reason.

IAsyncEnumerable, on the other hand, has no "Create" method. You can make one, but it's horribly inefficient. It is however possible to implement efficienctly in straight IL, so there'd be a huge benefit to language integration and having the compiler generate the complex state machine around it.

If the argument devolves into simply that one of push or pull is better than the other, and thus we should forget about the inferior one and not bother integrating with it, I think that's pretty short sited. Both models do things that the other is simply unable to efficiently accomplish.

(Also, Ix-Async already implements IAsyncEnumerable with all that good LINQ integration if you want to check it out)

LeeCampbell commented 9 years ago

Well put @scalablecory.

HaloFour commented 9 years ago

@scalablecory

Sorry if it comes across that I'm having a debate, or that it's a either/or proposition. I don't think that "push" is better than "pull" or vice versa, only that there are use cases for both and that it would be nice to support multiple forms of "streams" within async methods. I think that achieving feature parity with F# would also be a very good thing.

My two cents is that language integration wouldn't provide a significant benefit over Observable.Create.

You're right. I think that it would be nice to have yield support for IObservable<T>, but Observable.Create is already very easy to use.

IAsyncEnumerable, on the other hand, has no "Create" method.

Sure it does, Ix-Async offers AsyncEnumerable.Create as well as a few other helper methods. Plus you can already convert between IObservable<T> and IAsyncEnumerable<T>. Of course if we're talking about a third Microsoft-provided IAsyncEnumerable<T> then no, nobody has written those methods yet.

What I'm really interested in hearing is any preliminary ideas regarding how these streams would be consumed. What a hypothetical foreach would look like with async streams. If it's based on a fairly-loosely defined pattern like GetAwaiter then support for IAsyncEnumerable<T> and IObservable<T> should both be quite easy to provide without the compiler having to know about either or any of those interfaces. To me, that's the ideal solution.

scalablecory commented 9 years ago

@HaloFour I believe you're mistaking EnumerableEx.Create for AsyncEnumerable.Create. There is nothing in Ix that provides "yield return" semantics for async sequence creation.

HaloFour commented 9 years ago

@scalablecory You're right, I am thinking of EnumerableEx. Doesn't seem like too far of a stretch to get an AsyncEnumerable.Create to work with pretty much the same syntax, though:

IAsyncEnumerable<int> ae = AsyncEnumerable.Create(async yield => {
    for (int i = 0; i < 10; i++) {
        await Task.Delay(1000);
        await yield.Return(i);
    }
});
BernhardGlueck commented 9 years ago

I created an implementation of Async Pulll Sequences a while back, with LINQ support.

The syntax for an AsyncEnumerable was quite similar to the proposed one:

   AsyncEnumerable.Create<T>(async producer =>
   {
          await producer.Yield(value).ConfigureAwait(false);
   });
danielcweber commented 9 years ago

As a proof of concept (and also to get some insight into Roslyn), I started implementing async iterators here. On this branch, the compiler is able to compile this file (called AsyncIterators.cs)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace AsyncIterators
{
    public class EntryPoint
    {
        public static void Main(string[] args)
        {
            var enumerator = new EntryPoint()
                .AsyncIterator()
                .GetEnumerator();

            while (enumerator.MoveNext().Result)
            {
                Console.WriteLine(enumerator.Current);
            }

            Console.ReadLine();
        }

        public IAsyncEnumerable<int> AsyncIterator()
        {
            for (var i = 0; i < int.MaxValue; i++)
            {
                await Task.Delay(500);
                yield return i;
            }
        }
    }
}

Call the Roslyn compiler like this:

csc "Your\Path\To\AsyncIterators.cs" /reference:"Your\Path\To\System.Interactive.Async.dll"

Get System.Interactive.Async.dll through the corresponding Nuget package.

The compiled .exe should output an incrementing number twice a second (as seen in the code).

The modification was pretty straightforward, there are two rewritings involved: First, the iterator is rewritten into a state machine (just like for synchronous iterators), but instead of IEnumerable and IEnumerator, IAsyncEnumerable and IAsyncEnumerator are implemented. The implementation of the async MoveNext method (that returns a Task<bool>) is subsequently rewritten into a state machine (just like for async methods). The resulting IL will therefore have two nested state machines.

I also tested enlosing the body of AsyncIterator by using/try-finally blocks, this also works.

Note that is is essentially only a proof of concept: In this commit, I point out the first language design issue that occured to me. Also, it is unclear how code in async iterators should access the CancellationToken that is passed to MoveNext.

Let me hear what you think.

scalablecory commented 9 years ago

@danielcweber Great job!

HaloFour commented 9 years ago

@danielcweber

Awesome job. I like how you reused Ix's IAsyncEnumerable. I wonder if Roslyn would eventually also support that interface as well as a BCL-provided one.

Also, it is unclear how code in async iterators should access the CancellationToken that is passed to MoveNext.

I had been wondering that myself. One potential solution might be to have the iterator method accept (require?) a CancellationToken argument and the generated state machine would effectively "merge" the two tokens together into one per every invocation of MoveNext? That might be too much voodoo and it doesn't help if the iterator doesn't accept a CancellationToken argument.

scalablecory commented 9 years ago

I'd almost like a syntax sort of like this:

IAsyncEnumerable<int> GetSequence() with token
{
    if(token.IsCancellationRequested)

But that may be too extreme of a change...

danielcweber commented 9 years ago

@HaloFour what's the BCL-provided interface for async iterators? @scalablecory I also thought about having a variable (eg. named "ct") implicitly in scope, like "this" is also implicitly in scope. Of course, you may not have a parameter with that name but I guess that would be tolerable.

HaloFour commented 9 years ago

@danielcweber

There isn't one. I am under the assumption that the feature would depend upon such an interface being added to the BCL as to not require a project to take a hard dependency on Ix-Async. Even if the feature is convention-driven and would work with Ix-Async I would expect an identical-looking interface to be added to the BCL.

dsyme commented 9 years ago

I believe the only truly systematic and consistent approach is to have the MoveNext operation on the AsyncIterator accept the Cancellation token.

That is, the natural and systematic translation to make any method M asynchronous is as follows: "any operation M generating result R translates to an async method M taking a cancellation token CT and giving result Task<R>".

Altering the scope and flow of cancellation tokens to be different to this tends to be like fiddling with assembly code and registers - things that look sensible come back to bite you later. It's possible passing the token to GetEnumerator will work but my guess is it will have problems in some cases.

Note that F# async avoids this problem by hiding cancellation tokens (they are implicitly threaded them through the asynchronous computation structure - you only supply a cancellation token when starting an overall composite Async). In F# code you generally don't have to pass cancellation tokens explicitly at all. That simplification was dropped in the C# version of the feature. Anyway, we should get Tomas Petricek's to add this to his summary of differences between the two models.

Cheers Don

HaloFour commented 9 years ago

I believe the only truly systematic and consistent approach is to have the MoveNext operation on the AsyncIterator accept the Cancellation token.

Of course, and that is to be expected. The question is how to expose that token in the iterator.

That is, the natural and systematic translation to make any method M asynchronous is as follows: "any operation M generating result R translates to an async method M taking a cancellation token CT and giving result Task".

But these sequences themselves also represent an asynchronous operation which can potentially be cancelled as a whole, such as a single HTTP call which is returning a stream of data asynchronously which is parsed into a sequence of elements. You'd need a reliable mechanism to convey cancellation to that process which may need to be separate from cancelling the current iteration. This gets even hairier if you want to distinguish cancelling the attempt to move to the next element from cancelling the sequence.

Altering the scope and flow of cancellation tokens to be different to this tends to be like fiddling with assembly code and registers

Isn't that describing the coroutine shenanigans done with iterator and asynchronous state machines in general? :smile:

I think that ultimately there are probably two options. Provide a keyword that will access the current cancellation token, or add the concept of a current thread-local cancellation token to the BCL.

paulomorgado commented 9 years ago

@HaloFour, when would you need to cancel MoveNext without canceling whte whole iteration and vice versa?

HaloFour commented 9 years ago

@paulomorgado Probably not. At best canceling the MoveNext operation would leave the sequence in an indeterminate state. But I do think that we'll want a simple cancellation mechanism that unifies the three that exist in the async sequence pattern:

  1. CancellationToken passed to the async iterator function.
  2. CancellationToken passed to the IAsyncEnumerator.MoveNext method.
  3. The IDisposable interface implemented by IAsyncEnumerator.

I think that having the MoveNext method accept a CancellationToken makes things really hairy. There is no existing syntax that would allow for the iterator to accept that token. If that iterator is based on multiple operations already in flight even if you could obtain that token it couldn't really be used to affect those operations. Any existing foreach syntax lacks the notion of passing an argument to the MoveNext function. All of these problems would need to be addressed and I fear the syntax that would arise as the answer.

I'm thinking that maybe we keep it simple and that cancellation via CancellationToken is optional and only available if the async iterator explicitly accepts one as an argument. When that token is cancelled the entire sequence is then cancelled.

danielcweber commented 9 years ago

@HaloFour: By "the async iterator explicitly accepts one as an argument" you mean the method that returns IAsyncEnumerable<T> and contains yield return statements having a parameter of type CancellationToken? Or do you mean that the GetEnumerator method of IAsyncEnumerable should take a CancellationToken ?

paulomorgado commented 9 years ago

We have to take in account that there are two sides to an enumerable: the producer and the consumer.

Although most developers will be consumers, ease of production will benefit all. So, language support for creating asynchronous enumerators will have it's own value.

We also have to take in account that some types might be both synchronous and asynchornous enumerables. I don't think the existing foreach keyword can be reused. At least. not alone.

Maybe the best compromise will be something like this:

foreach async(var item in collection, cancellationToken)
{
    ...
}

But it gets a lot more trickier if we want to use LINQ operatores. Should the pattern be augmented to pass around a cancellation token.

Or should we see how obervables/qbservables fit in all this?

HaloFour commented 9 years ago

@danielcweber The method that uses yield return, etc. That's likely the only method the consumer will be calling directly and passing arbitrary arguments.

@paulomorgado Rescanning the thread it doesn't appear that any modifications to foreach to consume async sequences has been discussed yet. I do expect that some syntax changes would be required to make that happen and maybe there is room to fit in a CancellationToken as you describe. But you'd also need syntax in the producer to accept that syntax. That could involve syntax changes to yield return although that could only provide a value for the second iteration and onward.

Observables do this very differently. Once you subscribe you don't ask for the next values, they just come to you. Cancellation can be implied by unsubscribing, which is done by disposing of the subscription. Rx provides a helper class CancellationDisposable which can trigger a CancellationToken when disposed which can allow the producer to react to being unsubscribed.

scalablecory commented 9 years ago

@HaloFour The sequence is cold at the point where you call the generator method. This is not an appropriate place to pass a cancellation token.

GetEnumerator() could work, but it's not exactly in line with existing practice. Right now, just about everywhere in .NET, you pass in a token to the method that does the actual work, not to some encompassing factory instance.

MoveNext() is the most in line with existing practice -- both in the Ix implementation, as well as with streaming sources like DbDataReader and Entity Framework.

HaloFour commented 9 years ago

@scalablecory I know, it's just the only place where the consumer would normally explicitly pass any arguments and the only place where iterator methods can accept arguments. Anything beyond that is going to require some fun syntax candy for both the consumer and the producer.

The consumer is probably relatively simple. We'll probably end up with something similar to what @paulomorgado suggested.

For the producer, the only thing that really makes sense to me is a new context sensitive keyword or expression that would allow access to the CancellationToken parameter of MoveNext. However, where I grapple a little with this is how that might behave if the iterator method also accepted a CancellationToken parameter, or if the iterator method is a cold enumeration over a hot sequence.

danielcweber commented 8 years ago

As async streams are now being brainstormed in #5383, I took the occasion and rebased my proof of concept of async streams found here. It still works for this simple example (and probably for complexer ones, too).

weitzhandler commented 8 years ago

Interesting idea.

jskeet commented 8 years ago

(I'm assuming discussion on async streams properly belongs here rather than on the language design meeting notes.)

5383 suggests that cancellation tokens may well be handled in the same way as await configuration, but it's not clear to me how feasible that is, at least if we want yield return support. I can see how an extension method can easily make an IAsyncEnumerable<T> which just doesn't flow the context, for any arbitrary IAsyncEnumerable<T>... but the code in the iterator block would need to get at the desired cancellation token.

It feels to me like the GetEnumerator method should be passed the cancellation token - because I would expect a single token while iterating over the whole sequence.

One extra note: should IAsyncEnumerator<T> implement IAsyncDisposable as well? There was a brief mention in the language notes, but it didn't crop up in the example and I haven't seen it mentioned here. This may be quite complicated when it comes to cancellation, as you may need two cancellation tokens - if an iteration operation times out, you still want to get a shot at disposing of the sequence, on the other hand you might want an "overall" timeout. Here be dragons, I suspect.

jcdickinson commented 8 years ago

Wouldn't cancellation tokens simply behave as they always have?

async IAsyncEnumerable<int> SlowNumbersAsync(int from, int to, CancellationToken token)
{
    for (var i = from; i <= to && !token.IsCancellationRequested; i++)
    {
        await Task.Delay(100, token);
        yield return i;
    }
}

foreach await (var item in SlowNumberAsync(100, 200, token))

A system for configuring arbitrary tokens would be great but I don't think it's necessarily specific to foreach await.

jskeet commented 8 years ago

Well, the question is whether the IAsyncEnumerable itself knows the cancellation token, or whether it's each time you iterate that knows the cancellation token. Would it make sense to have an IAsyncEnumerable<string> which (lazily) fetched stock tickers from a web service, and which could be reused multiple times, with a different continuation token each time you iterate over it? Maybe, maybe not. The fact that there are three steps involved (creating the sequence, creating the iterator, and then calling MoveNext - multiple times - leads to lots of choices...

scalablecory commented 8 years ago

Looking at it a few different ways here:

If we go the GetEnumerator() route, it breaks convention -- you're no longer passing the token to the method doing the work. But, it also presents the most efficient operation in that you won't have to create any proxy cancellation tokens.

If we go the MoveNext() route, it keeps convention, but will either be confusing to use or inefficient to implement. Consider my previous suggestion:

IAsyncEnumerable<int> GetSequence() with token
{
    if(token.IsCancellationRequested)

Here, the GetEnumerator() route is will ensure token never changes, as one would normally expect. A naive MoveNext(CancellationToken) sugar will change the meaning of token after every yield return -- efficient, but clearly confusing. A more anchored implementation will need to wrap it in a proxy to ensure token doesn't change:

CancellationTokenSource proxy;
MoveNext(CancellationToken token)
{
    using(token.Register(()=>proxy.Cancel()))
    {
        // user's code.
    }
}

Which is clearly not very efficient if we consider this overhead for every item.