protobuf-net / protobuf-net.Grpc

GRPC bindings for protobuf-net and grpc-dotnet
Other
846 stars 106 forks source link

Q: Bi-directional streaming #234

Open ScottKane opened 2 years ago

ScottKane commented 2 years ago

The only examples of this I can seem to find are using Grpc.Core giving you IAsyncStreamReader/Writer. Sorry if I'm missing something obvious here, but what is the intended way for this to work with protobuf-net? Both client/server sides of a request if possible where I have access to both the request and response streams.

mgravell commented 2 years ago

https://protobuf-net.github.io/protobuf-net.Grpc/gettingstarted discusses this; search for "duplex"

tl;dr: "use IAsyncEnumerable<T> as either the return type or the parameter type"

ScottKane commented 2 years ago

Ok thanks, do you support passing IObservable instead? If not, is this something that could be considered? It would be really nice to have a signature like:

[OperationContract] IObservable<string> Subscribe(IObservable<string> requests, CallContext context = default);

So that we effectively get a push based stream of data on each end that can be observed.

ScottKane commented 2 years ago

Am I missing something with:

Client:

var requests = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(_ => DateTime.UtcNow.ToString(CultureInfo.InvariantCulture));

requests.Subscribe(request => Console.WriteLine($"[Client]: {request}"));

service.Subscribe(requests.ToAsyncEnumerable())
    .ToObservable()
    .Subscribe(response => Console.WriteLine($"[Server]: {response}"));

Server:

public IAsyncEnumerable<string> Subscribe(IAsyncEnumerable<string> requests)
{
    requests.ToObservable().Subscribe(request => Console.WriteLine($"[Client]: {request}"));

    var responses = Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Select(_ => DateTime.UtcNow.ToString(CultureInfo.InvariantCulture));

    responses.Subscribe(response => Console.WriteLine($"[Server]: {response}"));

    return responses.ToAsyncEnumerable();
}

requests.Subscribe(request => Console.WriteLine($"[Client]: {request}")) is causing the client to log the time but the IAsyncEnumerable on the server end doesn't seem to be working.

I would expect each to ping a timestamp over once per second here.

mgravell commented 2 years ago

Well, what happens? I'm not psychic, but using "string" won't usually work well unless you've manually registered a marshaller for that (using UTF8, presumably)

On Mon, 4 Apr 2022, 22:17 ScottKane, @.***> wrote:

Am I missing something with:

Client:

var requests = Observable .Interval(TimeSpan.FromSeconds(1)) .Select(_ => DateTime.UtcNow.ToString(CultureInfo.InvariantCulture)); requests.Subscribe(request => Debug.Log($"[Client]: {request}")); service.Subscribe(requests.ToAsyncEnumerable()) .ToObservable() .Subscribe(response => Debug.Log($"[Server]: {response}"));

Server:

public IAsyncEnumerable Subscribe(IAsyncEnumerable requests) { requests.ToObservable().Subscribe(request => Console.WriteLine($"[Client]: {request}"));

var responses = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(_ => DateTime.UtcNow.ToString(CultureInfo.InvariantCulture));

responses.Subscribe(response => Console.WriteLine($"[Server]: {response}"));

return responses.ToAsyncEnumerable();

}

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/234#issuecomment-1088021716, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMBJYVHA4ZDN2PNALODVDNL5TANCNFSM5SQFYEOQ . You are receiving this because you commented.Message ID: @.***>

ScottKane commented 2 years ago

No I think this should have just worked as I have a simple Echo working with a string (and I also tested the stream with a contracted type), I think I have discovered the problem, I'm working in Unity which doesn't support HTTP2, so I've had to use Grpc.Web to even get things talking which I don't think supports bidi/duplex streaming but I could be wrong. I tried forcing my server to run over Http1 in the hopes of avoiding having to use Web on the client but I couldn't get it to work, it seemed to still try to force Http2, maybe there's some kestrel options for this I can dig into as an alternative.

ScottKane commented 2 years ago

I initially thought maybe the observer or subscriber was blocking but I seem to be able to call Echo just fine either side so it's just that the server isn't getting the message at all. I would have thought I would get some form of unsupported exception if I can't use duplex in Grpc.Web but apparently not.

ScottKane commented 2 years ago

That said, if Grpc.Web does support duplex streaming I'm at a loss as to why the server is getting nothing here.

mgravell commented 2 years ago

Random question: does unity support raw TCP? Socket and/or NetworkStream? If it does, I may have something...

ScottKane commented 2 years ago

They do as far as I'm aware, the Http2 problem is because they have an older version of HttpClient in their version of the BCL which causes the problem. All socket based approaches should work as normal as far as I'm aware (I've had a websocket TCP and UDP connection running in the past)

ScottKane commented 2 years ago

They are in the process of getting the runtime migrated to CoreCLR instead of the ancient fork of Mono they currently have which will mitigate all this in the future but we're probably a fair way off that happening. In newer versions they have at least moved to netstandard2.1 support which does help a lot for now.

mgravell commented 2 years ago

if it is of interest, I have an experimental gRPC client and server implementation that doesn't use HTTP at all; it uses the exact same API, so existing code should "just work"; tested locally against TCP with/without TLS/client-certs and named pipes, but it is transport agnostic. Let me know if that's something you'd be interested in exploring.

ScottKane commented 2 years ago

Absolutely interested, if I can't use http2 there's no real reason I need to be tied into any specific transport. I'm more interested in having a strongly typed client

mgravell commented 2 years ago

The client code doesn't change whatsoever. Regardless of whether you're using contract-first generated client/server, or code-first (both of which could be labelled "strongly typed"): they work exactly the same. For the client, the only thing that changes is the channel creation code; if you want to give this a whirl, emphasizing that this is experimental: see https://github.com/protobuf-net/protobuf-net.Grpc/tree/grpclite/src/protobuf-net.GrpcLite - noting that a build of it is on NuGet

ScottKane commented 2 years ago

Ok awesome, I will have a play around. So by not deeply integrated with kestrel you mean I have to set up a LiteServer on top of kestrel?

ScottKane commented 2 years ago

Also I can't find server.ServiceBinder.AddCodeFirst(), should this extension be included in GrpcLite, Grpc or Grpc.Native because I can't seem to find it.

mgravell commented 2 years ago

sorry; the package is hidden so you won't have known to update; short answer: protobuf-net.Grpc v1.0.167

<PackageReference Include="protobuf-net.Grpc" Version="1.0.167" />

mgravell commented 2 years ago

re "on top of kestrel" - yes; currently it doesn't hook into Kestrel, although I have written an IDuplexPipe hook, it we want to attach the server to Kestrel's inbuilt TCP support; short version: I couldn't figure out the code to get Kestrel to host both a regular gRPC server/web-site and a raw TCP server. If you can find that magic juice, let me know! There is an example Kestrel server here: https://github.com/protobuf-net/protobuf-net.Grpc/tree/grpclite/tests/protobuf-net.GrpcLite.Server - it isn't "pretty", because it does a lot more than you usually would in any sane process - it can run in either netfx (as a console) or .net 6 (via Kestrel), and spins up a ton of endpoints.

mgravell commented 2 years ago

(there's also a ConnectionHandler at the bottom of that file, re the IDuplexPipe mode, direct in Kestrel; I just didn't get around to getting that to play nicely yet); important emphasis: this code is 100% experimental, may never land, etc; I'm looking at it for some scenarios relevant to me. But if it turns out to be genuinely useful to folks, it would be great to merge it and make it "real"

ScottKane commented 2 years ago

I think if I can get it up and running there will definitely be a use for optional gRPC over TCP, for me it would definitely be nice to get things playing nice with kestrel. Where does IDuplexPipe.AsFrames() come from?

mgravell commented 2 years ago

Ah, that's the .Pipelines project; I probably haven't built that one to budget just yet

On Thu, 7 Apr 2022, 18:11 ScottKane, @.***> wrote:

I think if I can get it up and running there will definitely be a use for optional gRPC over TCP, for me it would definitely be nice to get things playing nice with kestrel. Where does IDuplexPipe.AsFrames() come from?

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/234#issuecomment-1091994488, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMGZQSQ47SEOWBMDSKLVD4JKNANCNFSM5SQFYEOQ . You are receiving this because you commented.Message ID: @.***>

ScottKane commented 2 years ago

This is what I'm going with for now (I'm assuming as I only want to listen through kestrel this would be correct - obviously once there is a Pipelines package on nuget):

public static class Program
{
    public static async Task Main() =>
        await WebHost.CreateDefaultBuilder()
            .ConfigureServices(services =>
            {      
                services.AddSingleton<IRemotePlayerService, PlayerService>();

                services.AddCodeFirstGrpc();
                services.AddSingleton<GrpcTcpHandler>();
                services.AddSingleton(s =>
                {
                    var server = new LiteServer();
                    server.ServiceBinder.AddCodeFirst(s.GetRequiredService<IRemotePlayerService>());
                    // Is there a version of this extension that automatically discovers the services?
                    // Maybe allow passing an IServiceProvider that it can resolve from

                    return server;
                });
            })
            .UseKestrel(options =>
            {
                options.Listen(new IPEndPoint(IPAddress.Loopback, 5000), o => o.UseConnectionHandler<GrpcTcpHandler>());
            })
            .Configure((_, app) =>
            {
                app.UseRouting();
                app.UseEndpoints(endpoints =>
                {
                    endpoints.MapGrpcService<PlayerService>();
                });
            })
            .Build()
            .RunAsync();
}

public sealed class GrpcTcpHandler : ConnectionHandler
{
    private readonly LiteServer _server;
    public GrpcTcpHandler(LiteServer server) => _server = server;

    public override Task OnConnectedAsync(ConnectionContext connection) => _server.ListenAsync(connection.Transport.AsFrames());
}
mgravell commented 2 years ago

Nice. I wasn't far off - must have been missing a using directive for some helper extension methods. I'll see about updating some nuggets tomorrow. But: assuming you built locally: did anything work?

ScottKane commented 2 years ago

Rather than building the full source to get IDuplexPipe.AsFrames() I just decided to test with server.ListenAsync(ConnectionFactory.ListenSocket(new IPEndPoint(IPAddress.Loopback, 5000)).AsStream().AsFrames()) for now. Simple console client is working but there is some awkwardness with async code in constructors in Unity (my client was creating the channel in constructor) that just freezes up the editor on start so I haven't been able to properly test inside Unity as of yet. A job for tomorrow I think, I will let you know how it goes.

I just took a look at https://github.com/davidfowl/MultiProtocolAspNetCore for reference and hoped it was right.

ScottKane commented 2 years ago

Also maybe in your test your launchSettings.json applicationUrls might be what stopped things playing nicely? I'm not 100% but if you do anything in UseKestrel() you basically ignore launchSettings

ScottKane commented 2 years ago

Would also be interesting to know if you could have this over UDP using IMultiplexedConnectionBuilder (I think it uses MsQuic which is over UDP?). From a game POV, allowing gRPC over both would be insanely powerful.

mgravell commented 2 years ago

Well, if we go to UDP, we ultimately end up either accepting a bunch of limitations, or having to implement a bunch of what TCP provides; in particular packet loss, packet size limits, and ordering. If we want those capabilities, it would be better to just use TCP rather than ... re-implement TCP over UDP, so we can assume that we're happy to lose some features. OK, so let's roll with that; thoughts:

I guess my unstated question here is: are those limits acceptable as a consumer?

ScottKane commented 2 years ago

To be honest I would say that those considerations would be a fair compromise for using gRPC, like you say, streaming unordered packets is difficult to begin with but I would say it definitely adds value. For example in a game when the server receives a movement request you want the most up to date information where you are less concerned with the order. Most games have to manually set up TCP and UDP socket connections and flick between them for different requests.

These tradeoffs excluding streaming would be made regardless of using gRPC for a UDP socket so I definitely think this would be worth some investigation.

In my client I am currently creating a connection, grabbing the proxy services and registering them in DI, having the ability to just create 2 channels (one TCP and one UDP) and then simply decide the protocol by which channel I get the service from would be amazing.

ScottKane commented 2 years ago

I would also like to throw it out there that I think there is for sure a need for this socket based approach for gRPC. The biggest use case I can think is allowing duplex streaming from Blazor apps (where this isn't currently possible). Which would be incredible as a lot of front end frameworks use reactive/observable design which pairs so nicely with streams.

mgravell commented 2 years ago

Random question: is Blazor (in particular the client-side WASM variant you cite) something you're experienced in? My weakness here is that I'm deeply a library geek, and setting up the kind of client-side code to prove that it works (and works well) isn't something I'm hugely familiar with. By which I mean: I have basically zero experience in client-side Blazor, and don't know which APIs would work well to demonstrate this. Can we just use NetworkStream, for example?

It would also probably be a good thing to finish up the client-side proxy generation ("generators") code, so the client doesn't need to run reflection. Very doable.

On Sat, 9 Apr 2022, 01:16 ScottKane, @.***> wrote:

I would also like to throw it out there that I think there is for sure a need for this socket based approach for gRPC. The biggest use case I can think is allowing duplex streaming from Blazor apps (where this isn't currently possible). Which would be incredible as a lot of front end frameworks use reactive/observable design which pairs so nicely with streams.

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/234#issuecomment-1093511135, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMA3SMTWEZEYD7YAJBLVEDD7LANCNFSM5SQFYEOQ . You are receiving this because you commented.Message ID: @.***>

ScottKane commented 2 years ago

I have been working with Blazor commercially since it launched (server first but switched to wasm as soon as it went into preview). Typically in most Blazor apps you just work with a HttpClient and call your server endpoints (normally restful). I see no reason why you couldn't use NetworkStream (although I've not seen anyone doing this or done it myself). The only limitation I have found in Blazor is the lack of http2 (because of browser limitations). Any socket based approach over TCP should work fine.

Getting up and running with a Blazor client is extremely simple these days, the wasm template gives a simple "FetchData" page which demonstrates calling a HttpClient injected from DI (in the template it just serves a json file located in the client itself) but the same principle applies to calling a rest server.

It basically ends up with a load of pages/components each with a load of HttpClient calls (or via an abstracted service doing the calls) that look something like this: await Http.GetFromJsonAsync<WeatherForecast[]>("api/weather");. As you can probably tell, there is nothing here that guarantees that the endpoint you call actually returns a WeatherForecast[], or that it even exists.

Having gRPC provide a client wouldn't change too much in how you call the server (you still inject and call some service), but the guarantees you have about what is being returned without having an OpenAPI spec involved would be a night and day difference.

Most Blazor code in production today that I've seen uses this approach (including many of my own apps) but I imagine there is a fair number of people using grpc-web and/or SignalR as an alternative. I currently only use SignalR if I have something I need to send from server to client.

I don't think there is any current library/framework that actually gives you duplex streaming access from Blazor so I think GrpcLite is a winner there to begin with.

ScottKane commented 2 years ago

As for streaming specifically, I really haven't seen any streaming done between a Blazor client and a server. This is likely more a product of the technology being fairly new coupled with most of the documentation using HttpClient in examples. In future apps I would probably just use gRPC by default (the only reason I didn't previously is because while architecting my last major project, grpc-web wasn't available).

I think a lot of people are also concerned with losing the whole OpenAPI/Swagger flow as it's their only way to communicate an API's design effectively to consumers (even though with gRPC you can just give them a client which is 100000x better anyways)

ScottKane commented 2 years ago

https://github.com/AdrienTorris/awesome-blazor is a great place to learn about Blazor and find sample projects etc

ScottKane commented 2 years ago

Once there is a build of the .Pipelines package on nuget I will throw together a sample chat app or something demonstrating duplex from Blazor (assuming there's no unforeseen gotchas).

Thinking on that, does the server call context give me a way to send a message to a specific client? As opposed to all subscribed?

mgravell commented 2 years ago

https://www.nuget.org/packages/protobuf-net.GrpcLite.Pipelines/ https://www.nuget.org/packages/protobuf-net.GrpcLite/

Note: repo has now moved; I've split the GrpcLite bits out to a separate repo: https://github.com/protobuf-net/protobuf-net.GrpcLite

menaheme commented 2 years ago

Hi, Grpc pipelines gives me a 404

On Sun, Apr 10, 2022, 5:49 PM Marc Gravell @.***> wrote:

https://www.nuget.org/packages/protobuf-net.GrpcLite.Pipelines/ https://www.nuget.org/packages/protobuf-net.GrpcLite/

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/234#issuecomment-1094290284, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABLOIH4N7UIVK4Q57N36OODVELTADANCNFSM5SQFYEOQ . You are receiving this because you are subscribed to this thread.Message ID: @.***>

menaheme commented 2 years ago

Probably a nuget.org issue, works now

On Sun, Apr 10, 2022, 6:16 PM menahem elgavi @.***> wrote:

Hi, Grpc pipelines gives me a 404

On Sun, Apr 10, 2022, 5:49 PM Marc Gravell @.***> wrote:

https://www.nuget.org/packages/protobuf-net.GrpcLite.Pipelines/ https://www.nuget.org/packages/protobuf-net.GrpcLite/

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/234#issuecomment-1094290284, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABLOIH4N7UIVK4Q57N36OODVELTADANCNFSM5SQFYEOQ . You are receiving this because you are subscribed to this thread.Message ID: @.***>

ScottKane commented 2 years ago

Yeah probably a good shout, I'm just testing in Unity now but the client connection code hangs. Am I correct in thinking with the server implementation I posted earlier that this should work? ConnectionFactory.ConnectSocket(new IPEndPoint(IPAddress.Loopback, 5000)).AsFrames().CreateChannelAsync()

ScottKane commented 2 years ago

Ahh never mind, was just Unity being stupid. Now I'm getting active refusal from the server so progress

ScottKane commented 2 years ago

Client:

var requests = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(_ => DateTime.UtcNow.ToString(CultureInfo.InvariantCulture));

requests.Subscribe(request => Debug.Log($"[Client]: {request}"));

service.Subscribe(requests.ToAsyncEnumerable())
    .ToObservable()
    .Subscribe(response => Debug.Log($"[Server]: {response}"));

Server:

public IAsyncEnumerable<string> Subscribe(IAsyncEnumerable<string> requests)
{
    requests
        .ToObservable()
        .Subscribe(request => Console.WriteLine($"[Client]: {request}"));

    var responses = Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Select(_ => DateTime.UtcNow.ToString(CultureInfo.InvariantCulture));

    responses.Subscribe(response => Console.WriteLine($"[Server]: {response}"));

    return responses.ToAsyncEnumerable();
}

Server receives the first message from the client and then throws a TimeoutException:

[Client]: 04/10/2022 18:15:50
[Server]: 04/10/2022 18:15:50
Unhandled exception. System.TimeoutException: The operation has timed out.
   at ProtoBuf.Grpc.Lite.Internal.LiteStream`2.ThrowCancelled() in /_/src/protobuf-net.GrpcLite/Internal/LiteStream.cs:line 287
   at ProtoBuf.Grpc.Lite.Internal.LiteStream`2.ProtoBuf.Grpc.Lite.Internal.IStream.Cancel() in /_/src/protobuf-net.GrpcLite/Internal/LiteStream.cs:line 260
--- End of stack trace from previous location ---
   at ProtoBuf.Grpc.Lite.Internal.LiteStream`2.System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult(Int16 token) in /_/src/protobuf-net.GrpcLite/Internal/LiteStream.cs:line 923
   at ProtoBuf.Grpc.Lite.Internal.LiteStream`2.MoveNextPayloadAsync() in /_/src/protobuf-net.GrpcLite/Internal/LiteStream.cs:line 401   at ProtoBuf.Grpc.Lite.Internal.LiteStream`2.<Grpc.Core.IAsyncStreamReader<TReceive>.MoveNext>g__Awaited|8_0(LiteStream`2 stream, ValueTask pending, CancellationTokenRegistration ctr) in /_/src/protobuf-net.GrpcLite/Internal/LiteStream.cs:line 103
   at ProtoBuf.Grpc.Internal.Reshape.AsAsyncEnumerable[T](IAsyncStreamReader`1 reader, CancellationToken cancellationToken)+MoveNext() in /_/src/protobuf-net.Grpc/Internal/Reshape.cs:line 121
   at ProtoBuf.Grpc.Internal.Reshape.AsAsyncEnumerable[T](IAsyncStreamReader`1 reader, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at System.Linq.AsyncEnumerable.ToObservableObservable`1.<>c__DisplayClass2_0.<<Subscribe>g__Core|0>d.MoveNext() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs:line 50
--- End of stack trace from previous location ---
   at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) in /_/Rx.NET/Source/src/System.Reactive/Internal/ExceptionServicesImpl.cs:line 19
   at System.Reactive.ExceptionHelpers.Throw(Exception exception) in /_/Rx.NET/Source/src/System.Reactive/Internal/ExceptionServices.cs:line 16
   at System.Reactive.Stubs.<>c.<.cctor>b__2_1(Exception ex) in /_/Rx.NET/Source/src/System.Reactive/Internal/Stubs.cs:line 16
   at System.Reactive.AnonymousObserver`1.OnErrorCore(Exception error) in /_/Rx.NET/Source/src/System.Reactive/AnonymousObserver.cs:line 73
   at System.Reactive.ObserverBase`1.OnError(Exception error) in /_/Rx.NET/Source/src/System.Reactive/ObserverBase.cs:line 59
   at System.Linq.AsyncEnumerable.ToObservableObservable`1.<>c__DisplayClass2_0.<<Subscribe>g__Core|0>d.MoveNext()
--- End of stack trace from previous location ---
   at System.Linq.AsyncEnumerable.ToObservableObservable`1.<>c__DisplayClass2_0.<<Subscribe>g__Core|0>d.MoveNext() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs:line 74
--- End of stack trace from previous location ---
   at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
   at System.Threading.QueueUserWorkItemCallbackDefaultContext.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
   at System.Threading.Thread.StartCallback()

The client logs it's own subscription but never receives a response from the server, It throws an ArgumentOutOfRangeException which I'm assuming is because the enumerated IAsyncEnumerable is changing as it's being enumerated (which should be fine on an IAsyncEnumerable?).

mgravell commented 2 years ago

impossible to diagnose without something I can look at; the IAsyncEnumerable<T> aspect should be fine.

ScottKane commented 2 years ago

Ok now I'm even more confused, I pulled the code into a new solution for a repro and it works. This has got to be something to do with the version of Rx I'm using in Unity. Normal Rx has some issues in Unity so I was using https://github.com/neuecc/UniRx to get things up and running. I'm assuming that these Observables aren't compatible with ToAsyncEnumerable and is what's causing the issue.

Repro code here (working) : https://github.com/ScottKane/GrpcLiteTest

The server throws this when shutting down, not sure how to handle this gracefully:

fail: Microsoft.AspNetCore.Server.Kestrel[0]
      Unhandled exception while processing 0HMGRP4AAEHAL.
      Microsoft.AspNetCore.Connections.ConnectionAbortedException: The connection was aborted because the server is shutting down and request processing didn't complete within the time specified by HostOptions.ShutdownTimeout.
         at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result)
         at System.IO.Pipelines.Pipe.GetReadAsyncResult()
         at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token)
         at ProtoBuf.Grpc.Lite.Internal.Connections.PipeFrameConnection.System.Collections.Generic.IAsyncEnumerable<ProtoBuf.Grpc.Lite.Connections.Frame>.GetAsyncEnumerator(CancellationToken cancellationToken)+MoveNext() in /_/src/protobuf-net.GrpcLite.Pipelines/Internal/Connections/PipeFrameConnection.cs:
line 49
         at ProtoBuf.Grpc.Lite.Internal.Connections.PipeFrameConnection.System.Collections.Generic.IAsyncEnumerable<ProtoBuf.Grpc.Lite.Connections.Frame>.GetAsyncEnumerator(CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
         at ProtoBuf.Grpc.Lite.Internal.ListenerEngine.RunAsync(IConnection connection, ILogger logger, CancellationToken cancellationToken) in /_/src/protobuf-net.GrpcLite/Internal/ListenerEngine.cs:line 43
         at ProtoBuf.Grpc.Lite.Internal.ListenerEngine.RunAsync(IConnection connection, ILogger logger, CancellationToken cancellationToken) in /_/src/protobuf-net.GrpcLite/Internal/ListenerEngine.cs:line 172
         at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
         at ProtoBuf.Grpc.Lite.Internal.Connections.PipeFrameConnection.WriteAsync(ChannelReader`1 source, CancellationToken cancellationToken) in /_/src/protobuf-net.GrpcLite.Pipelines/Internal/Connections/PipeFrameConnection.cs:line 134
         at Server.GrpcTcpHandler.OnConnectedAsync(ConnectionContext connection) in D:\Repos\GrpcLiteTest\Server\Program.cs:line 57
         at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.KestrelConnection`1.ExecuteAsync()
Unhandled exception.
ScottKane commented 2 years ago

Hmm, I switched to the proper System.Reactive package in my Unity project and I'm still getting the error, I can only assume that this is something to do with being in full framework using netstandard2.1 compatibility. Maybe the runtime is missing something it should have. Now that I think about it, I think some Runtime changes were made to support IAsyncEnumerable/IObservable that isn't yet available in Unity, guess I will have to wait for Unity to get to net6.0

mgravell commented 2 years ago

Well, I can't discuss nuances of RX in Unity - I lack knowledge of both. If there's anything I can help with: let me know!

On Mon, 11 Apr 2022, 16:34 ScottKane, @.***> wrote:

Hmm, I switched to the proper System.Reactive package in my Unity project and I'm still getting the error, I can only assume that this is something to do with being in full framework using netstandard2.1 compatibility. Maybe the runtime is missing something it should have. Now that I think about it, I think some Runtime changes were made to support IAsyncEnumerable/IObservable that isn't yet available in Unity, guess I will have to wait for Unity to get to net6.0

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/234#issuecomment-1095205861, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMAOH3E4A3BMGPIJX73VERBB7ANCNFSM5SQFYEOQ . You are receiving this because you commented.Message ID: @.***>

ScottKane commented 2 years ago

No worries, I will put together a sample to prove the TCP streaming in Blazor so people can see a decent use case

ScottKane commented 2 years ago

This is not as important outside of Unity, but as of right now because the IObservable cannot properly be converted to IAsyncEnumerable in the old full framework runtime, is there any way we can add the ability to pass a raw IObservable over gRPC instead of converting it to an IAsyncEnumerable? Sorry I'm a bit naive to the work entailed in doing this but would be happy to help. I think even outside of Unity, doing this would be a performance boost for people using Observables as they wouldn't constantly have to flip back and forth between the two.

mgravell commented 2 years ago

Ultimately, IAsyncEnumerable is "kinda sorta" core BCL (via nugets on netfx), and observable (RX): isn't. I do not currently have direct support for RX. We could probably add some if it is genuinely useful, but then I'd need to figure out some kind of mechanism to allow this to work without adding RX as a dependency of the core lib. Ultimately, most people: not using RX.

ScottKane commented 2 years ago

IObservable/IObserver (the base interfaces) exist in the BCL, it's included under System without needing System.Reactive (which contains concrete implementations and extensions). So this can be done without adding further dependencies. As you mention, not everyone uses Rx, hence this design.

https://source.dot.net/#System.Private.CoreLib/IObservable.cs,46c68a3e1fe79623 https://source.dot.net/#System.Private.CoreLib/IObserver.cs,27ac209e0cbdb704

mgravell commented 2 years ago

TIL, thanks. That: sounds usable. I'll take a look.

ScottKane commented 2 years ago

Awesome thanks!

mgravell commented 1 year ago

Well, if you get something working and are feeling generous: there's plenty of space for some examples in the repo :)

On Mon, 11 Apr 2022, 21:14 ScottKane, @.***> wrote:

No worries, I will put together a sample to prove the TCP streaming in Blazor so people can see a decent use case

— Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/234#issuecomment-1095521393, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMDY6NV3SEYYPAOT4ATVESB2VANCNFSM5SQFYEOQ . You are receiving this because you commented.Message ID: @.***>