timostamm / protobuf-ts

Protobuf and RPC for TypeScript
Apache License 2.0
1.1k stars 129 forks source link

RPC Output Stream drops data if `AsyncIterator` isn't invoked before response is received #650

Open jcready opened 5 months ago

jcready commented 5 months ago

The ServerStreamingCall's responses is an RpcOutputStream which has an AsyncIterator. If the consumer does not synchronously invoke the AsyncIterator data can be dropped.

let serverStreaming = service.serverStreamingMethod(foo);
await (new Promise((resolve) => setTimeout(resolve, 1000)));

for await (let message of serverStreaming.responses) {
    console.log("got a message", message)
}

The above is a race condition. If the service starts sending streaming responses before 1 second had passed then no console log will happen. This is despite the claim: https://github.com/timostamm/protobuf-ts/blob/1798e0d43c31eafb4b5877ef72f05abb87456823/packages/runtime-rpc/src/rpc-output-stream.ts#L18-L20

There is also this: https://github.com/timostamm/protobuf-ts/blob/1798e0d43c31eafb4b5877ef72f05abb87456823/packages/runtime-rpc/src/rpc-output-stream.ts#L17

However I had always interpreted that to mean that if something had already finished consuming the stream then invoking the AsyncIterator a second time would result in no messages. But perhaps when it refers to stream it means the underlying network call instead of the locally buffered stream.

In either case this behavior seems undesirable and non-obvious as it would be akin to saying that if the consumer didn't await the unary response before the underlying network call finished that we'd just lose the data. e.g.

let unary = service.unaryMethod(bar);
await (new Promise((resolve) => setTimeout(resolve, 1000)));
const result = await unary.response; // undefined - dropped because we didn't await synchronously

This behavior also makes it challenging to effectively invoke multiple requests in parallel:

// issue network calls in parallel
let serverStreamingCall1 = service.serverStreamingMethod1(foo);
let serverStreamingCall2 = service.serverStreamingMethod2(bar);
let unaryCall1 = service.unaryMethod1(baz);
let unaryCall2 = service.unaryMethod2(quz);

// consume them one at a time
const unaryResult1 = await unaryCall1.response;
const unaryResult2 = await unaryCall2.response;
// does not work
for await (const streamingResult1 of serverStreamingCall1.responses) {
  // ...
}
for await (const streamingResult2 of serverStreamingCall2.responses) {
  // ...
}

In order for the consumer to actually be successful they would instead need to do this:

// issue network calls in parallel
let serverStreamingCall1 = service.serverStreamingMethod1(foo);
let serverStreamingCall2 = service.serverStreamingMethod2(bar);
let unaryCall1 = service.unaryMethod1(baz);
let unaryCall2 = service.unaryMethod2(quz);

let streamingBuffer1 = serverStreamingCall1.responses[Symbol.asyncIterator]();
let streamingBuffer2 = serverStreamingCall1.responses[Symbol.asyncIterator]();
let streamingIterator1 = { [Symbol.asyncIterator]() { return streamingBuffer1 } };
let streamingIterator2 = { [Symbol.asyncIterator]() { return streamingBuffer2 } };

// consume them one at a time
const unaryResult1 = await unaryCall1.response;
const unaryResult2 = await unaryCall2.response;
for await (const streamingResult1 of streamingIterator1) {
  // ...
}
for await (const streamingResult2 of streamingIterator2) {
  // ...
}

I assume the existing behavior might be done in order to avoid extra memory/GC in the off-hand chance that a consumer invoked the streaming call and then simply did nothing with it and/or only cared about the headers or something, but that seems like an odd thing to optimize for.