Champion "Async Streams" (including async disposable) (16.3, Core 3) #43

Open gafter opened 7 years ago

gafter commented 7 years ago

See dev spec: See also

LDM notes:

gafter commented 7 years ago

This would be something like

interface IAsyncEnumerable<T>
        IAsyncEnumerator<T> GetEnumerator();
interface IAsyncEnumerator<T>: IAsyncDisposable
        Task<bool> MoveNextAsync();
        T Current { get; }

With the following elements yet to be designed

clairernovotny commented 7 years ago

@gafter I thought we settled on

interface IAsyncEnumerable<T>
        IAsyncEnumerator<T> GetEnumerator(CancellationToken cancellation);

And potentially

static class AsyncEnumerableExtensions
    static IAsyncEnumerator<T> GetEnumerator(this IAsyncEnumerable<T> source) => source.GetEnumerator(CancellationToken.None);
juepiezhongren commented 7 years ago

oliverjanik commented 7 years ago

Any news on this? Seems this interface has been defined in several places already (IX, EF7)

benaadams commented 7 years ago

As brought up in roslyn thread

interface IAsyncEnumerator<T>: IAsyncDisposable
    Task<bool> MoveNextAsync();
    T Current { get; }

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system (e.g. double gets on Current or missed items on Current)

IvanKonov commented 7 years ago

Maybe then:

interface IAsyncEnumerator<T> : IAsyncDisposable
    Task<bool> GetNextAsync(out T next);;
    Task<T> Current { get; }
HaloFour commented 7 years ago

An async method can't have out parameters.

IvanKonov commented 7 years ago

Another version:

interface IAsyncEnumerator<T> : IAsyncDisposable
    Task<(bool has_value, T next)> GetNextAsync();
    Task<T> Current { get; }
clairernovotny commented 7 years ago

vladd commented 7 years ago

@IvanKonov This definition would make IAsyncEnumerator<T> not covariant by T.

benaadams commented 7 years ago

An async method can't have out parameters.


Task<bool> GetNextAsync(out T next);

isn't async; its just Task<bool> returning, and you can await with out params so no problems? 😉

async Task DoThing()
    var enumerator = new Enumerator<int>();
    while(await enumerator.GetNextAsync(out var next))
        // Do thing with next

struct Enumerator<T>
    public Task<bool> GetNextAsync(out T next)
        next = default(T);
        return Task.FromResult(false);
yaakov-h commented 7 years ago

Doesn't the out param have to be set before the task is returned? At that point, the task will most likely not be completed, so you won't have a "next" to pass as the out param. Then, by the time you have your next, it's too late to fill the out param.

benaadams commented 7 years ago

Doesn't the out param have to be set before the task is returned?

stephentoub commented 7 years ago

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system

While true, if you have a source that supports multiple readers, you can also simply change the model to one where each reader is given its own enumerator. Then the concurrency issues evaporate / are left up to the coordinator that's giving the items out to the individual enumerators.

I'll go with @onovotny's suggestion instead

I don't understand the suggestion. When I do:

Task<bool> t = enumerator.GetMoveNext(out T result);
await t;

how is that intended to work?


Note that most of this was already discussed in

clairernovotny commented 7 years ago

I was trolling, but what if using out parameters with Task required use of await? Then the assignment order is guaranteed.

stephentoub commented 7 years ago

I was trolling, but what if using out parameters with Task required use of await? Then the assignment order is guaranteed.

a) that would be a breaking change, b) there are actually scenarios where you want to access the out value after the synchronous return of the method, and c) anything can be made awaitable, so task isn't special in this regard.

yaakov-h commented 7 years ago

@stephentoub is there a Cliffs Notes version of that thread?

stephentoub commented 7 years ago

In the near future I'm planning to take some time to write down where we are, which would include summarizing salient points from that thread.

scottt732 commented 7 years ago

Have you flushed out how CancellationTokens would work here? Is the intent to explicitly call GetEnumerator... something like foreach (await var item in ae.GetEnumerator(cancellationToken)) { ... } ?

Task<bool> MoveNextAsync(CancellationToken cancellationToken) in the interface feels more consistent, but I gather that would require additional language changes to accommodate in foreach/while, LINQ query/method syntax.

Would it be possible to get the IEnumerable/LINQ and IObservable/Rx extension methods wired up to IAsyncEnumerable? Do IAsyncQueryable/IAsyncQbservable factor in to the plans at all?

Igorbek commented 7 years ago

discriminated unions would be useful here.

Lukazoid commented 7 years ago

If Nullable reference types landed, could you not just have

Task<T?> GetNextAsync()

Where null is returned for no next item.

yaakov-h commented 7 years ago

@Lukazoid but what if null is the next item?

e.g. new string[] { "a", "b", null, "d" } is a perfectly valid enumerable sequence, and I'd expect it to remain valid with asynchronous sequences too.

Lukazoid commented 7 years ago

@yaakov-h Well your array would actually be new string?[]{ .. } with the nullable reference types, T would be string? so the result of GetNextAsync would be string??, but thinking about it, I guess that probably isn't allowed in the same way int?? isn't allowed.

But if it were, you would use it like so:

var possibleNextItem = await asyncEnumerator.GetNextAsync();
if (possibleNextItem.HasValue) {

    // nextItem could be null if T were a nullable type
    var nextItem = possibleNextItem.Value;

It was just a thought which I personally feel is much cleaner than having two values, having it baked into the type makes more sense. As @Igorbek says, discriminated unions would also solve this cleanly.

yaakov-h commented 7 years ago

@Lukazoid string? is not a separate CLR type from string under the nullable references proposal. string?? would be meaningless/impossible.

stephentoub commented 7 years ago

If Nullable reference types landed, could you not just have

And not without losing covariance. This is discussed here:

asyncawaitmvvm commented 6 years ago

Have you flushed out how CancellationTokens would work here? Is the intent to explicitly call GetEnumerator... something like foreach (await var item in ae.GetEnumerator(cancellationToken)) { ... } ?

You have to abstract that operation at a higher level. The IAsyncEnumerableT is only the final piece.

foreach (await var item in ae.EnumThingsAsync(cancellationToken)) { ... }

public IAsyncEnumerableT EnumThingsAsync( CancellationToken ct = default(CancellationToken)) { return new ThingEnumerable(ct); }

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system (e.g. double gets on Current or missed items on Current)

I beg to differ. See above.

MoveNextAsync() is supposed to handle ALL of the off-thread loading. You're not even supposed to think about Current until MoveNextAsync() is done. You're not supposed to have multiple readers unless you're locking so well that the enumerable doesn't think there are multiple readers. Be patient and await, or abstract it at a higher level like you're supposed to and hand out multiple IAsyncEnumerables.

yaakov-h commented 6 years ago

Given that it's just an interface, MoveNextAsync doesn't have to actually start or control the enumeration. It simply returns a series of Tasks which represent the asynchronous operation to retrieve the next object.

It's up to the implementation as to whether loading begins when the IAsyncEnumerable<> is created, or when MoveNextAsync is called. Any intermediary item could be prefetched too. For example, you could enumerate a paginated remote call, so you get 50 items at a time, and only every 50th MoveNextAsync is actually asynchronous.

asyncawaitmvvm commented 6 years ago

scalablecory commented 6 years ago

@asyncawaitmvvm AsyncTaskMethodBuilder<T> already returns a cached Task for bools, so not needed.

jnm2 commented 6 years ago

@asyncawaitmvvm See

Discarded options considered:

  • ValueTask<bool> MoveNextAsync(); T current { get; }: There's no benefit to using ValueTask<bool> instead of Task<bool>, as all possible Task<bool> values for synchronous completion can be cached, for asynchronous completion a Task<bool> is allocated anyway to back the ValueTask<bool>, and ValueTask<bool> adds additional overheads not present with Task<bool> (e.g. it has both a T and Task<T> field, which means its builder does, too, which means it increases the size of state machine types that await it).
asyncawaitmvvm commented 6 years ago

Please don't bake something as implementation-dependent as CancellationToken into this. Although it is a fundamentally important pattern, I can foresee replacements being written with internal optimizations. IAsyncEnumerable needs to be plain and simple because deep threading usually has custom situational requirements and IAE couldn't meet them all anyway. Are you going to ref-count my UI threads too? Didn't think so. Keep it simple.

scalablecory commented 6 years ago

While I'd prefer WaitForNextAsync / TryGetNext for manually written iterators, I'm not sure how you'd translate yield iterators into these two methods. If we're going for broke with performance, a way to avoid allocating a new async state for every WaitForNextAsync would be preferred but I don't know how that could be done without massively complicating things.

svick commented 6 years ago


Please don't bake something as implementation-dependent as CancellationToken into this.

How is CancellationToken any more implementation-dependent than Task? It's part of the core library and used by many methods on core types.

I can foresee replacements being written with internal optimizations.

Could you be more specific? What optimizations would including CancellationToken in the interface prohibit? How would not including it help?

mattwar commented 6 years ago

@scalablecory Maybe we could add a 'yield all' statement that would yield an entire array/list of values.

stazz commented 6 years ago

Hello all,

It seems anyone can comment on this issue, so I've decided to chime in and share some of my views on and experiences with async enumeration in C# so far - I hope this post will help in creating a good and versatile API for async enumeration. I am not sure if this kind of feedback comes too late - I've only recently myself learned some of the more complex aspects of asynchronous enumeration. I am working on CBAM project, which provides asynchronous task-based API to execute some kind of statement (e.g. SQL command or HTTP request/response sequence) remotely and then asynchronously iterate over the results.

From this point of view, the asynchronous enumerables and asynchronous enumeration has the following concepts, requirements, and definitions:

Putting all of these points together is done in UtilPack.AsyncEnumeration library, which is used by the CBAM projects to actually expose asynchronous enumerator.

Points of observation of the UtilPack.AsyncEnumeration:

So using this kind of enumeration interface, it is possible to enumerate sequentially asynchronously:

  // Enumerate ascending sequence of 10 integers (0..9) sequentially asynchronously:
  const MAX_ITEMS = 10;
  var start = MAX_ITEMS;
  MoveNextAsyncDelegate<Int32> moveNext = async token =>
    var decremented = Interlocked.Decrement( ref start );
    // Pretend to do something asycnhronously
    await Task.Delay( 100, token );
    // Return tuple of: 1. boolean indicating whether we still have more elements, and 2. the actual element
    return (decremented >= 0, MAX_ITEMS - decremented - 1);

  // Use factory-pattern combined with callback-based skeleton implementation to get an enumerator
  var enumerator = AsyncEnumeratorFactory.CreateSequentialEnumerator<Int32>(
    async token =>
      (var success, var item) = await moveNext( token );
      return (success, item, moveNext, null);

  // This will print values from 0 to 9 to console output, and itemsEncountered will be 10 after awaiting
  var itemsEncountered = await enumerator.EnumerateSequentiallyAsync( item =>
    Console.WriteLine( "Current item: " + item );
  } );

and also in parallel asynchronously:

  // Enumerate sequence of 10 integers (0..9) in parallel asynchronously:
  const MAX_ITEMS = 10;
  var start = MAX_ITEMS;

  // Use factory-pattern combined with callback-based skeleton implementation to get an enumerator
  var enumerator = AsyncEnumeratorFactory.CreateParallelEnumerator(
    () =>
      // Notice that this callback is synchronous - in order for parallel enumerator to be "nice" and don't cause extra calls when enumerated.
      var decremented = Interlocked.Decrement( ref start );
      // Return tuple of 1. boolean indicating whether we still have more elements, and 2. state information to pass to next callback ( in this case it is the decremented value)
      return (decremented >= 0, decremented);
    async ( decremented, token ) =>
      // Pretend to do something asynchronously
      await Task.Delay( 100, token );
      // Return the actual item
      return MAX_ITEMS - decremented - 1;

  // This will print values from 0 to 9 to console output (but not necessarily in that order), and itemsEncountered will be 10 after awaiting
  var itemsEncountered = await enumerator.EnumerateInParallelAsync( item =>
    Console.WriteLine( "Current item: " + item );
  } );

  // The sequential enumeration actually works too, and just like one would expect (printing ascending series of values from 0 to 9 to console output)
  var itemsEncountered = await enumerator.EnumerateSequentiallyAsync( item =>
    Console.WriteLine( "Current item: " + item );
  } );

It seems that the concept of the parallel asynchronous enumeration is missing from this discussion. Is it intended to be implemented somewhere else separately? What are your opinions about the parallel asynchronous enumeration, and about the token-based concept of coupling between MoveNextAsync and OneTimeRetrieve (or WaitForNextAsync / TryGetNext )?

mattwar commented 6 years ago

I have a prototype of LINQ operators implemented using this proposal, in one of its current states.

scalablecory commented 6 years ago

@stazz I believe this proposal should focus purely on async as a separate concept from parallelism, ignoring parallelism other than to prevent making it completely incompatible. There's no reason PLINQ couldn't be extended to handle any of the current IAsyncEnumerable proposals, so I think that is already accomplished. Libraries like TPL Dataflow are more appropriate for non-trivial parallelism anyway, imho.

mattwar commented 6 years ago

What happens when MoveNextAsync is called before all the items have been exhausted via calling TryGetNext?

1) Calling MoveNextAsync does nothing, returning true, requiring you to always iterate through all items via TryGetNext first before MoveNextAsync will move forward. 2) Calling MoveNextAsync abandons any remaining items and fetches the next group/block/etc.

If you were using IEnumerator<T>, calling MoveNext repeatedly will always move on, regardless of whether you've accessed the Current property. This makes it easy to think about IEnumeratorAsync<T> as being just like IEnumerator<T>, except each "MoveNext" gets you multiple values instead of just one. This would imply that calling MoveNext before you've accessed all the items would just move the iteration on to the next set. So option 2.

On the other hand, if you tend to think of the two methods of the IAsyncEnumerator<T> type as IEnumerator<T>.MoveNext broken into two parts, then it may make more sense to choose option 1.

If you are trying to write a wrapper class around an existing IEnumeratorAsync<T> to apply additional logic, supporting option 1 would make that task more complicated, as you would need to track more state and writing the class would be more complicated because of it. (This is from experience trying to write the async linq prototype.)

However, if you are trying to support parallel consumers via IEnumeratorAsync<T>, then option 2 would make that incredibly difficult, because you would have to have two different kinds of synchronization across the two methods and that would get tricky or expensive. Though, it may be a lost cause to have this API work for parallel consumers, since the TryGet API could easily lead to consumer starvation, since there is no way to queue the accesses.

HaloFour commented 6 years ago


It seems that the concept of the parallel asynchronous enumeration is missing from this discussion.

The idea came up a couple of times on the Roslyn repo but it never gained traction. I think that the idea of a foreach-like construct where multiple threads could be executing simultaneously within the body was unpalatable.

This proposal is specifically about consumer-requested single pull model, and the interfaces describe only that. But those interfaces could be used to bridge a parallel or reactive producer with whatever appropriate backpressure/buffering policy is required. That would be done through separate libraries, though, and is not explicitly captured as a part of this proposal.

clairernovotny commented 6 years ago

There is a fully working implementation of this proposal, with all of the operators, here:

stazz commented 6 years ago

Hmm, I am not sure if I used term "parallel" correctly in my previous post. What I meant was that there are scenarios when we don't want to await for MoveNextAsync to complete before calling it again. Maybe the term "concurrent", as in concurrent enumeration, is better?

@scalablecory : I understand the concern of keeping this proposal "on topic", and not to derail too much. I guess the main motivation for my scenario was that since there will be a language construct for sequentially enumerating async enumerable (foreach await), for the sake of orthogonality and user-friendlyness, there should be a language construct for concurrently enumerating async enumerable (maybe foreach concurrent ?). Using PLINQ and separate libraries manually tends to make code a bit "plumby". Unless... those libaries could make their own foreach concurrent language construct by code analyzer or other way of hooking up to Roslyn compilation process. Is that possible? Should it be considered in this proposal?

@HaloFour : Thanks for the additional clarification! I wasn't aware that this proposal is only about consumer-requested single pull model - it explains the lack of conversation about the parallel/concurrent enumeration.

@mattwar Hmm, interesting point of view - writing async LINQ library. I am not sure how the WaitForNextAsync / TryGetNext pattern would fare in such test. I could make a small test though, since I am curious myself as to how one would write async LINQ implementation for such enumerable.

asyncawaitmvvm commented 6 years ago

. Unless... those libaries could make their own foreach concurrent language construct by code analyzer or other way of hooking up to Roslyn compilation process. Is that possible? Should it be considered in this proposal?

Parallel.ForEach() is completely useless and irrelevant for async methods, but ForEachAsync() has many possible competing algorithms which is why everyone writes their own. I maintain two different ones myself and can foresee a third soon. The easiest way is to create a work item for every core and have each item read-loop the enumerable inside an async lock (SemaphoreSlim?) until it's empty. The only problem is that the cores are idle if all the tasks are pending I/O, and there are ways to capitalize on that.

markusschaber commented 6 years ago

I think the best way for parallel enumeration / iteration is to have one enumerator instance for each consumer, and the iterators internally know how to coordinate themselves.

So, we could define a interface IParallelEnumerable to get an IParallelEnumeratorSource, which itself produces an arbitrary amount of IEnumerator<T> instances which internally know how to coordinate themselves, and IAsyncParallelEnumerable creates IAsyncParallelEnumeratorSources which produce IAsyncEnumerator<T> instances.

We could simplify the XXXParallelEnumeratorSource interfaces to IEnumerable<IXXXEnumerator<T>>, possibly.

stazz commented 6 years ago

@mattwar It took me a while to understand the exact point of your message. I had to convert my UtilPack.AsyncEnumeration project to use WaitForNextAsync / TryGetNext pattern, and write some LINQ extension methods (just to test and learn and investigate) for modified IAsyncEnumerator, and then I started to see it. The synchronous callback LINQ (IAsyncEnumerable<T> Where<T>( this IAsyncEnumerable<T> enumerable, Func<T, Boolean> syncPredicate )) was easy, and I ended up in pretty much identical enumerator class as in your implementation. However, the asynchronous callback LINQ (IAsyncEnumerable<T> Where<T>( this IAsyncEnumerable<T> enumerable, Func<T, Task<Boolean>> asyncPredicate )) was more difficult indeed, and that was when I realized what exactly your message was about.

The root cause is that using the WaitForNextAsync / TryGetNext pattern in the proposal, the context is lost between the invocations:

Task<bool> WaitForNextAsync();
T TryGetNext(out bool success);

However, if the method signatures of that pattern would be changed like this (very close to the ones I had in my original message):

// This example uses Int64, but can be any struct, really
// Return null to signal that enumerator has reached the end
Task<Int64?> WaitForNextAsync(); // Or maybe ValueTask<Int64?> here?
T TryGetNext( Int64 waitToken, out bool success );

Then the context could be preserved via the Int64 value. Obviously, this means that those enumerators, which intend to support multiple concurrent consumers, need to have some sort of ConcurrentDictionary mapping Int64 tokens to T values. This does indeed introduce some memory and complexity issues, but IMO supporting multiple concurrent consumers should be completely optional. Those implementations that won't support concurrent consumers could have much simpler code, since they wouldn't need to use ConcurrentDictionary since they can assume that only one wait-token is used at a time.

I hope that if the WaitForNextAsync / TryGetNext pattern survives over MoveNextAsync / Current pattern in the proposal, then the WaitForNextAsync / TryGetNext pattern would be modified in such way that TryGetNext is aware of exact invocation of WaitForNextAsync (for example, like in the code sample above, Int64 could be used to pass invocation capture). Otherwise, the concurrent consumers mentioned as minor benefit in the proposal would not be a real benefit, as it doesn't scale well in e.g. writing LINQ library.

markusschaber commented 6 years ago

@stazz I think that the prize (in terms of complexity) is too high for those who don't need parallel enumeration. The proposal with the multiple enumerators I described has the advantage that only those users who need parallel enumeration pay any prize at all. (I don't claim any ownership on that proposal, I saw it somewhere else, but don't remember where)

stazz commented 6 years ago

Ah yes, I forgot to mention in previous message, that another option would just indeed make this IAsyncEnumerator purely for sequential asynchronous enumeration, and handle parallel/concurrent enumeration entirely via external libraries. That would be completely ok as well.

@markusschaber But I am curious - what exactly did you mean by complexity price? From consumer point of view, it would be just changing this loop (direct copypaste from the proposal):

IAsyncEnumerable<T> enumerator = enumerable.GetAsyncEnumerator();
while (await enumerator.WaitForNextAsync())
    while (true)
        int item = enumerator.TryGetNext(out bool success);
        if (!success) break;

into this loop:

IAsyncEnumerable<T> enumerator = enumerable.GetAsyncEnumerator();
Int64? waitToken;
while ((waitToken = await enumerator.WaitForNextAsync()).HasValue)
    while (true)
        int item = enumerator.TryGetNext(waitToken.Value, out bool success);
        if (!success) break;

From a sequential producer point of view, all it needs is to have one extra Int64 field, which could be just Interlocked.Incremented on each successful WaitForNextAsync call, and TryGetNext would just add one extra condition that given waitToken parameter matches the previous one stored in the field.

Or did you mean something else entirely? :)

markusschaber commented 6 years ago

Exactly that extra Int64 field is what I meant.

It requires that non-parallel consumers also handle a token, which is overhead.

Additionally, it forces implementors to have some indirection via the token and an internal lookup. When every consumer has it's own IAsyncEnumerator instance, that indirection can be avoided, which is probably more efficient in some cases.

jnm2 commented 6 years ago

Additionally, it forces implementors to have some indirection via the token and an internal lookup.

Yeah– no way. This isn't about parallelization. It's about a non-blocking version of the single-consumer idiom IEnumerable.

stazz commented 6 years ago

@markusschaber Ahh, I see. Fair enough point! I was thinking that one Int64 field is not that bad (especially since asynchrony usually involves I/O, which tends to be more bottleneck than CPU/memory), but I do agree that it is overhead. 👍 Oh well, let's see how this proposal develops - it still might end up with MoveNextAsync / Current pattern anyways.

benaadams commented 6 years ago

Async filesystem enumeration would be an example use case; as there are currently no async .NET filesystem apis