protobuf-net / protobuf-net.Grpc

GRPC bindings for protobuf-net and grpc-dotnet
Other
864 stars 112 forks source link

[Question] Is IAsyncStreamReader/IServerStreamWriter supported? #91

Open TieSKey opened 4 years ago

TieSKey commented 4 years ago

Sorry for posting a question as an issue but I try to avoid the SO gestapo :S

Afaik this library manages streaming through IAsyncEnumerable which is a C#8 feature I can't use. I'm locked in framework 4.7.2 language level 6 (or was it 7?) by Unity3D where the interface exists but can't be used as return of an async method and yield return is not valid either.

So as the title says, can I use IAsyncStreamReader/Writer? pretty much the way the not-code-first .net implementation uses them.

I'm a new born to grpc so the only alternative I could manage is having the client await a call that the server will eventually respond when data is available. Upon getting a response the client then awaits another call. Kind of works but feels super hackish and probably inefficient.

Thx in advance.

mgravell commented 4 years ago

Right now: no. But it probably isn't much work to add this.

However, I wonder if it is actually necessary. System.Threading.Channels.Channel<T> can be used as an intermediary in a producer (there are helper methods in the library to expose this as an async-enumerable to pass in, and you'd push to the writer), and you should be able to manually unroll the consumer via GetAsyncEnumerator, in your current language version.

But: I'm not opposed to investigating if we can add more direct support - or even just a different API shape. For example, how about if instead of taking an async producer, we took something like a Func<CancellationToken, ValueTask<(bool, T)>> (if you see what I mean - essentially an async "try get next with cancellation" callback)

TieSKey commented 4 years ago

Wow u are fast. I just came back to let u know the very latest alpha version of the Unity3D engine released... a week ago? introduces partial support for C#8 and the IAsyncEnumerable seems to be working! Since this "game" I'm making is just a 1-man-army project still ages from release, updating is not an issue. I'll take note of your suggestions in case any problem arises (since it is only partial support).

Thx again for your time!

jayrulez commented 4 years ago

Right now: no. But it probably isn't much work to add this.

However, I wonder if it is actually necessary. System.Threading.Channels.Channel<T> can be used as an intermediary in a producer (there are helper methods in the library to expose this as an async-enumerable to pass in, and you'd push to the writer), and you should be able to manually unroll the consumer via GetAsyncEnumerator, in your current language version.

But: I'm not opposed to investigating if we can add more direct support - or even just a different API shape. For example, how about if instead of taking an async producer, we took something like a Func<CancellationToken, ValueTask<(bool, T)>> (if you see what I mean - essentially an async "try get next with cancellation" callback)

@mgravell Could you please show an example of how this would be implemented?

mgravell commented 4 years ago

Which specifically? We've discussed a few things

jayrulez commented 4 years ago

I would appreciate both actually. I'm not exactly sure what I am doing here.

This is what I want to achieve.

We are trying to build a server for a turn based game.

To start, I want the client to make a connection to a server by sending a "LoginRequest". The server sends back a "LoginResponse" with a property for a security token. At this stage, the service adds the token to a list of connected clients maintained in a singleton registry.

If the login was successful, I want the client to open a bidirectional stream to the server. This is where I am having a hard time.

I got something to work but it seems a bit hacky and inflexible.

This is the call that opens the "stream":

public IAsyncEnumerable<PingResponse> Ping(IAsyncEnumerable<PingRequest> pings, CallContext context)
        {
            return context.FullDuplexAsync(_pingService.OutputStream, pings, _pingService.HandlePingRequest);
        }

And the ping service:

using Microsoft.Extensions.Logging;
using ProtoBuf.Grpc;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using TOH.Common.Services;

namespace TOH.Server.Services
{
    class PingService
    {
        private readonly ILogger _logger;
        private readonly SessionService _sessionService;
        private readonly List<Session> _sessions = new List<Session>();

        private ConcurrentQueue<PingResponse> _responses = new ConcurrentQueue<PingResponse>();

        public PingService(SessionService sessionService, ILogger<PingService> logger)
        {
            _sessionService = sessionService;
            _logger = logger;
        }

        public async IAsyncEnumerable<PingResponse> OutputStream(CallContext context)
        {
            while (!context.CancellationToken.IsCancellationRequested)
            {
                if (_responses.TryDequeue(out var response))
                {
                    yield return response;
                }
                else
                {
                    await Task.Yield();
                }
            }
            yield break;
        }

        public async ValueTask HandlePingRequest(IAsyncEnumerable<PingRequest> pings, CallContext context)
        {
            await foreach (var ping in pings)
            {
                _logger.LogInformation($"Handling PingRequest: '{ping.Message}'.");

                if (session != null)
                {
                    _responses.Enqueue(new PingResponse
                    {
                        Message = $"Response: {ping.Message} - {Guid.NewGuid()}"
                    });
                }
            }
        }
    }
}

I have to play around some more with this lib first to better understand how to use it but any recommendation is welcome.

mgravell commented 4 years ago

@jayrulez can I interpret from the code that the thing you're trying to avoid is the hot loop with the Task.Yield? If so: I don't think having direct access to the writer API would necessarily help much here.

Question: your use of ConcurrentQueue<T> - it that because of multiple concurrent writers, or is it because of single reader plus single writer? For multiple concurrent writers, Channel<T> is probably your best bet - noting that protobuf-net.Grpc also has a custom AsEnumerable extension method that completes the writer when it is cancelled, which may be useful. For single writer: I've been exploring a new "push enumerable" concept in a branch, which is effectively an async queue of length zero or one - probably ideal here? I haven't merged that branch yet, but: is that sounding like what you're after?

mgravell commented 4 years ago

@jayrulez here's the tracking PR for my investigation on "push enumerable": https://github.com/protobuf-net/protobuf-net.Grpc/pull/92

mgravell commented 4 years ago

I've updated my branch there with a few more thoughts, but - looking again at your code, I wonder if you're vastly over-complicating things; am I right in thinking that your ping responses basically marry 1:1 with the ping requests? if so, this simply isn't a full duplex scenario - it is a streaming request/response scenario - much much simpler. As such, I think you can just do:

public async IAsyncEnumerable<PingResponse> Ping(IAsyncEnumerable<PingRequest> pings, CallContext context)
{
    await foreach(var ping in pings.WithCancellation(context.CancellationToken))
    {
        if (_session == null) yield break;
        _logger.LogInformation($"Handling PingRequest: '{ping.Message}'.");

        yield return new PingResponse
        {
            Message = $"Response: {ping.Message} - {Guid.NewGuid()}"
        };
    }
}

without any of the concurrent queue bits; any help?

(I suspect that the .WithCancellation(context.CancellationToken) is also redundant here, since the pipeline here is cancellation aware, but: I don't want to have to "prove" that right now, so I've left it in for correctness over simplicity)

(full duplex is when the client can talk to the server, and the server can talk to the client, but either can talk at any time without it necessarily being in response to the other)

TieSKey commented 4 years ago

Since this is still open let me report back as I finally implemented gRPC in my hobbyist game. Client calls to the server each time it needs to send data. Server writes to a very rough awaitable queue which pushes to an IAsyncEnumerable the client previously subscribed to.

Of the top of your head, do you know if having an IAsyncEnumerable parameter (client->server) would be faster (in terms of latency, pure protocol overhead regardless of payload) than making individual calls? My gut feeling is the diff should be really really small if any but I'm no expert here.

Let me thank you for the library and the way you care to understand questions and suggest solutions instead of "why are you doing that? don't do that, do something else (even if that doesn't solve your problem)" that plagues SO :P

mgravell commented 4 years ago

In terms of "awaitable queue" - Channel-T provides that out of the box, with various configuration options. Whether any one approach is faster is hugely contextual, and I have not enough information here to comment. Ultimately, though, compared to RPC (which is usually off-box): you're unlikely to notice much either way.

On Wed, 16 Sep 2020, 17:19 TieSKey, notifications@github.com wrote:

Since this is still open let me report back as I finally implemented gRPC in my hobbyist game. Client calls to the server each time it needs to send data. Server writes to a very rough awaitable queue which pushes to an IAsyncEnumerable the client previously subscribed to.

Of the top of your head, do you know if having an IAsyncEnumerable would be faster (in terms of latency, pure protocol overhead regardless of payload) than making individual calls? My gut feeling is the diff it should be really really small if any but I'm no expert here.

Let me thank you for the library and the way you care to understand questions and suggest solutions instead of "why are you doing that? don't do that, do something else (even if that doesn't solve your problem)" that plagues SO :P

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/91#issuecomment-693513841, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMBDDOX7FTTC3DISRCTSGDQQ5ANCNFSM4NP7ZIQA .

TieSKey commented 4 years ago

Oh, I said queue but its dept should be mostly 0 or 1. It's just a way to push to an IAsyncEnumerable at arbitrary points in time from arbitrary pieces of code, iirc, the idea of the experimental branch you commented above.

In case you are curious here is the code (copied from somewhere in the internet). Plz ignore the consumer variable it should be a CancelationCallback delegate or something on that line :P

public class AwaitableQueue<T> {

    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public IEventSender Consumer;

    public AwaitableQueue(IEventSender consumer) {
        this.Consumer = consumer;
    }

    public void Enqueue(T item) {
        lock (queueLock) {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken) {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock) {
            return queue.Dequeue();
        }
    }

    public async ValueTask<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken) {
        try {
            await semaphore.WaitAsync(timeSpan, cancellationToken);
        } catch (OperationCanceledException canceledException) {
            GrpcNetworkService.OnConnectionLost(this.Consumer);
        }

        lock (queueLock) {
            return queue.Dequeue();
        }
    }

}
 private async IAsyncEnumerable<NetworkEvent> SubscribeForReceives(string id, [EnumeratorCancellation] CancellationToken cancel) {
            AwaitableQueue<NetworkEvent> sendQueue = eventSenders[id].SendQueue;
            TimeSpan timeSpan = TimeSpan.FromMilliseconds(-1);
            CancellationTokenSource jointTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancel, this.tokenSource.Token);
            CancellationToken jointToken = jointTokenSource.Token;

            while (!jointToken.IsCancellationRequested) {
                yield return await sendQueue.WhenDequeue(timeSpan, jointToken);
            }

            OnConnectionLost(eventSenders[id]);
        }
mgravell commented 4 years ago

I actually started a spike of a custom primitive for "queues" that would always be empty or one item; incomplete, but same concept

On Wed, 16 Sep 2020, 18:29 TieSKey, notifications@github.com wrote:

Oh, I said queue but its dept should be mostly 0 or 1. It's just a way to push to an IAsyncEnumerable at arbitrary points in time from arbitrary pieces of code, iirc, the idea of the experimental branch you commented above.

In case you are curious here is the code (copied from somewhere in the internet). Plz ignore the consumer variable it should be a CancelationCallback delegate or something on that line :P

public class AwaitableQueue {

private SemaphoreSlim semaphore = new SemaphoreSlim(0);
private readonly object queueLock = new object();
private Queue<T> queue = new Queue<T>();

public IEventSender Consumer;

public AwaitableQueue(IEventSender consumer) {
    this.Consumer = consumer;
}

public void Enqueue(T item) {
    lock (queueLock) {
        queue.Enqueue(item);
        semaphore.Release();
    }
}

public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken) {
    semaphore.Wait(timeSpan, cancellationToken);
    lock (queueLock) {
        return queue.Dequeue();
    }
}

public async ValueTask<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken) {
    try {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
    } catch (OperationCanceledException canceledException) {
        GrpcNetworkService.OnConnectionLost(this.Consumer);
    }

    lock (queueLock) {
        return queue.Dequeue();
    }
}

}

private async IAsyncEnumerable SubscribeForReceives(string id, [EnumeratorCancellation] CancellationToken cancel) { AwaitableQueue sendQueue = eventSenders[id].SendQueue; TimeSpan timeSpan = TimeSpan.FromMilliseconds(-1); CancellationTokenSource jointTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancel, this.tokenSource.Token); CancellationToken jointToken = jointTokenSource.Token;

        while (!jointToken.IsCancellationRequested) {
            yield return await sendQueue.WhenDequeue(timeSpan, jointToken);
        }

        OnConnectionLost(eventSenders[id]);
    }

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/protobuf-net/protobuf-net.Grpc/issues/91#issuecomment-693551085, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAEHMB3RJLMIXAY43IALA3SGDYXVANCNFSM4NP7ZIQA .

KarloX2 commented 3 years ago

Hi, I'm looking for an approach to implement bi-directional streaming:

Both client and server shall be able to send objects to each other at any point in time.

I read this thread and I'm not sure if it talks about the same and if it has a solution for me.

Can someone please explain? Shall I open a separate question here?

Thanks Karlo

TieSKey commented 3 years ago

AFAIK there are a couple ways to do it. There are some examples in this github but it depends on what platform u are targeting. Also remember that clients/callers do know the server/callee so they can send data whenever they want even if not through streaming (and implement an async enumerator purely on server side if you need to treat the data like that).

That being said, the author of the library is who really knows about this, I'm no expert here.

mgravell commented 3 years ago

The way protobuf-net.Grpc exposes this is via consuming or producing IAsyncEnumerable, which can be done entirely concurrently.

KarloX2 commented 3 years ago

I have a nicely working sample that shows how a client obtains an IAsyncEnumerable from the server and then, in an endless loop, awaits that IAsyncEnumerable to receive messages from the server as the server posts it. So far, so good.

But what's the best approach to have my client posting messages to the server in parallel to that ongoing loop...

Is there any example code that demonstrates such a full duplex messaging scenario?

Thanks very much!