grpc / grpc-dotnet

gRPC for .NET
Apache License 2.0
4.18k stars 769 forks source link

Use IAsyncEnumerable instead of IAsyncStreamReader #41

Closed JunTaoLuo closed 5 years ago

JunTaoLuo commented 5 years ago

We should consider using the new C# 8 feature IAsyncEnumerable for streaming RPC calls.

This may need some changes in the Grpc.Core and Grpc.Core.Api packages as well cc @jtattermusch

JamesNK commented 5 years ago

Potential idea that wouldn't require breaking the method signature:

foreach await (var message in requestStream.AsEnumerable())
{
}

@davidfowl

davidfowl commented 5 years ago

That's not bad. ChannelReader<T> added a ReadAllAsync() method that returns the IAsyncEnumerable<T>

clairernovotny commented 5 years ago

Hi folks,

As a heads up, we've released Ix Async 4.0 preview, which uses the BCL IAsyncEnumerable types and provides the LINQ operators for those. As a result of this, Ix Async 4 is a breaking change over 3.x. That said, we do provide support for using IAsyncEnumberable on .NET 4.5 and .NET Standard 2.0 should you need to target those platforms.

davidfowl commented 5 years ago

What's the plan here?

JamesNK commented 5 years ago

Plan for what?

I created an issue for gRPC to migrate await from Ix Async - https://github.com/grpc/grpc/issues/18592. No progress so far.

We haven't looked at IAsyncEnumerable support yet. Template and gRPC client have been the priority. I will have time to look at it soon, but gRPC moving away from Ix Async may impact whatever we do.

davidfowl commented 5 years ago

OK here are some more options:

JamesNK commented 5 years ago

The first option would require a new setting to be added to Grpc.Tools code generation and for C Core C# to support the new type. That would be a lot of work. Also gRPC folks are hesitant to add too many settings to code generation.

I prefer the extension method approach. Note that it doesn't require keeping the Ix dependency. The types in method signatures don't come form Ix. IAsyncStreamReader<T> implements Ix's IAsyncEnumerator<T>. Removing that interface will likely not effect the extension method because we'll cast to a known internal type.

JamesNK commented 5 years ago

I looked into this. Adding an extension method is extremely trivial.

public async static IAsyncEnumerable<T> ReadAllAsync<T>(this IAsyncStreamReader<T> streamReader, CancellationToken cancellationToken = default)
{
    if (streamReader == null)
    {
        throw new System.ArgumentNullException(nameof(streamReader));
    }

    while (await streamReader.MoveNext(cancellationToken))
    {
        yield return streamReader.Current;
    }
}

BUT, there are issues. We would want to use this method to use in the server and in the client. That means they should reference a common method. Grpc.Core.Api makes sense. But placing the method in Grpc.Core.Api would mean returning a new IAsyncEnumerable<T>.

Currently Grpc.Core.Api is using System.Reactive.Async which has a type of the same name. An extern alias would get around it. I can hack it in there, but really Grpc.Core.Api needs to drop System.Reactive.Async. See https://github.com/grpc/grpc/issues/18592#issuecomment-491655317

jtattermusch commented 5 years ago

It looks like making gRPC streams support IAsyncEnumerable<T> is conceptually wrong. The request/response streams are one-shot by nature and iterating over them multiple times makes no sense. Currently the IAsyncStreamReader<T> implements IAsyncEnumerator which makes sense, because enumerator only allows iterating over the messages once - so to me that's working as intended and also makes it clear to the user that iterating over the stream more than once is not allowed. (regardless of whether the old version of IAsyncEnumerator from ix-async or the new BCL version is used).

davidfowl commented 5 years ago

@jtattermusch I don't agree, that's a detail that users don't need to care about and the implementation can just throw if that's a big concern (enumerating more than once).

jtattermusch commented 5 years ago

@davidfowl enumerating more than once is basically impossible unless you want to cache all the received messages and we do not want that. I don't understand how pretending enumerating more than once is ok and then throwing if that's actually attempted is a better API choice than making the read stream truly one-shot and making that explicit in the API.

Also if we expose the stream as enumerable, that brings concerns about someone trying to enumerate the sequence from two separate threads (and you'd need synchronization to prevent that and throw on the second attempt). IAsyncEnumerator is straightforward and clearly expresses the nature of the sequence.

The only downside of IAsyncEnumerator is that is doesn't allow using async foreach directly, but I think that can be solved e.g. by adding an extension method like ReadAllAsync() (basically what's in https://github.com/grpc/grpc/pull/19060)

JamesNK commented 5 years ago

DbDataReader, a forward only reader of a SQL result set, implements IEnumerable.

using (SqlConnection conn = new SqlConnection(""))
using (SqlCommand comm = new SqlCommand("select * from somewhere", conn))
{
    conn.Open();

    using (var r = comm.ExecuteReader())
    {
        foreach (DbDataRecord s in r)
        {
            string val = s.GetString(0);
        }
    }
}

BUT, I don't feel strongly about it. I think an extension method like ReadAllAsync() that sits in Grpc.Core namespace is a fine alternative.

JamesNK commented 5 years ago

Blocked on determining the process for breaking changes.

JamesNK commented 5 years ago

Closed with https://github.com/grpc/grpc-dotnet/pull/438