grpc / grpc-dart

The Dart language implementation of gRPC.
https://pub.dev/packages/grpc
Apache License 2.0
855 stars 268 forks source link

Swapping out one stream for another without interruption. Are streams lazy? #605

Closed VerTiGoEtrex closed 1 year ago

VerTiGoEtrex commented 1 year ago

My end goal is to "resubscribe" to a stream with different parameters, without interruption.

First, a question illustrated by some client-side dart. When do dart-grpc streams first open a connection to the server?

// ...
final clientChannel = ...;
final client = someClient(clientChannel);

// When does the server start executing code?
final stream = client.someStreamingRPC();
// Is it now? The stream is created, but nobody is listening yet.
// I think this is referred to as a "hot" vs. "cold" stream. Most streams in dart are cold. Which is this?
final subscription = stream.listen((msg) => print(msg));
// Obviously it must be hot now, but does `.listen` block on the RPC to the server?
// How can I tell when the stream is "actually" listening?

If I want to resub without interruption, I feel I'm missing a trigger/callback in the current API.

To be able to resubscribe without interruption, I need this:

client.someStreamingRPC()
stream.listen() // (which instantly returns)
// **(during this time, the application is still using the old stream)**
// <some indeterminate amount of time to make the first round-trip to the server.>
// Round-trip completed -- stream "actually" ready
// **atomically swap out the old stream for the new one and unsub from the old stream**
// <some indeterminate amount of time>
// first message sent from server
// etc

From what I can tell, there's no way to be signaled that "stream actually ready" has occurred.

If I had control of the server, I could always send a dummy message as the first message to signal the stream is ready, but I don't control the server. For completeness, this is what that would look like:

client.someStreamingRPC()
stream.listen() // (which instantly returns)
// **(during this time, the application is still using the old stream)**
// <some indeterminate amount of time to make the first round-trip to the server.>
// Round-trip completed -- stream "actually" ready
// <some indeterminate amount of time>
// First message sent by server
// **(note that this was swapped spots with the above) atomically swap out the old stream for the new one and unsub from the 
old stream. Manually re-emit the first message since listeners were listening to the old stream at the time which may not have the event.**
// first message
// etc

Is there some other way I can ensure that the old stream is not discarded before the new one is ready without having to create some initial dummy message for the next stream to emit a message as a signal that it's ready to go, or waiting for a "first message" which may never be sent in some instances (e.g. if the stream is empty given some set of parameters?)

mraleph commented 1 year ago

When do dart-grpc streams first open a connection to the server?

When you call the method on your service stub. Whether or not you are listening on the stream does not matter, the call is already in progress at that point.

Currently ResponseStream does not expose any lifecycle events, but you could listen on stream.headers Future - once that completes the call has connected.