grpc / grpc-node

gRPC for Node.js
https://grpc.io
Apache License 2.0
4.47k stars 647 forks source link

grpc-js bidirectional client hangs open when server ends stream directly following async iterator #1839

Open sehrgut opened 3 years ago

sehrgut commented 3 years ago

Problem description

A server consuming a bidirectional (Duplex) stream using an async iterator and calling call.end() after the iterator loop completes leaves the client hanging open. The only workaround is to add an end event handler on the server stream, and call call.end() from there.

Reproduction steps

  1. Compile the following biditest.proto to ./proto/generated:
syntax = "proto3";
package biditest;

message Block {
    uint32  seq     = 1;
}

service BidiTest {
    rpc Do (stream Block) returns (stream Block);
}
  1. Run the following example script:
var grpc            = require('@grpc/grpc-js'),
    services        = require('./proto/generated/biditest_grpc_pb'),
    messages        = require('./proto/generated/biditest_pb');

function getDeadline(millis) {
    return Date.now() + millis;
}

async function sleep(interval) {
  return new Promise(resolve => {
    setTimeout(() => resolve(), interval);
  });
}

async function randomSleep(maxMillis) {
    let millis = Math.floor(Math.random() * maxMillis);
    await sleep(millis);
}

async function handleDoWithEndEvent(call) {

    call.on('end', () => {
        console.log("[Server1] Got end event");
        console.log("[Server1] Ending");
        call.end();
    });

    for await (const req of call) {
        let seq = req.getSeq();
        console.log("[Server1] Received %d", seq);
        let res = new messages.Block();
        res.setSeq(seq);

        console.log("[Server1] Sending %d", seq);
        call.write(res);
        await randomSleep(100);
    }

    console.log('[Server1] Async iterator ended');
}

async function handleDoPureAsync(call) {
    for await (const req of call) {
        let seq = req.getSeq();
        console.log("[Server2] Received %d", seq);
        let res = new messages.Block();
        res.setSeq(seq);

        console.log("[Server2] Sending %d", seq);
        call.write(res);
        await randomSleep(100);
    }
    console.log('[Server2] Async iterator ended');

    console.log('[Server2] Ending');
    call.end();
}

async function doClient(blocks) {
    var client  = new services.BidiTestClient(
        'localhost:50051',
        grpc.credentials.createInsecure()
    );

    let call = client.do({deadline:getDeadline(10000)});

    call.on('data', (res) => {
        console.log("[Client] Received %d", res.getSeq());
    });

    call.on('error', (err) => {
        console.error("[Client][ERROR] %s", err);
    });

    call.on('end', () => {
        console.log("[Client] Got End");
    });

    for (var i=0; i<blocks; i++) {
        console.log("[Client] Sending %d", i);
        var block = new messages.Block();
        block.setSeq(i);
        call.write(block);
        await randomSleep(100);
    }

    console.log('[Client] Ending'); 
    call.end();

}

function main1() {
    var server = new grpc.Server();
    server.addService(services.BidiTestService, {
        "do": handleDoWithEndEvent
    });
    server.bindAsync("127.0.0.1:50051",
        grpc.ServerCredentials.createInsecure(),
        () => {
            server.start();
            console.log("async-bidi-test:handleDoWithEndEvent service started on 127.0.0.1:50051");
            doClient(10).then(() => server.tryShutdown(()=> main2()));
        });
}

function main2() {
    var server = new grpc.Server();
    server.addService(services.BidiTestService, {
        "do": handleDoPureAsync
    });
    server.bindAsync("127.0.0.1:50051",
        grpc.ServerCredentials.createInsecure(),
        () => {
            server.start();
            console.log("async-bidi-test:handleDoPureAsync service started on 127.0.0.1:50051");
            doClient(10).then(() => server.tryShutdown(()=>{}));
        });
}

main1();
  1. You will observe output similar to the following:
async-bidi-test:handleDoWithEndEvent service started on 127.0.0.1:50051
[Client] Sending 0
[Server1] Received 0
[Server1] Sending 0
[Client] Received 0
[Client] Sending 1
[Server1] Received 1
[Server1] Sending 1
[Client] Received 1
[Client] Sending 2
[Server1] Received 2
[Server1] Sending 2
[Client] Received 2
[Client] Sending 3
[Server1] Received 3
[Server1] Sending 3
[Client] Received 3
[Client] Sending 4
[Server1] Received 4
[Server1] Sending 4
[Client] Received 4
[Client] Sending 5
[Server1] Received 5
[Server1] Sending 5
[Client] Received 5
[Client] Sending 6
[Server1] Received 6
[Server1] Sending 6
[Client] Received 6
[Client] Sending 7
[Server1] Received 7
[Server1] Sending 7
[Client] Sending 8
[Client] Received 7
[Server1] Received 8
[Server1] Sending 8
[Client] Received 8
[Client] Sending 9
[Client] Ending
[Server1] Received 9
[Server1] Sending 9
[Server1] Got end event
[Server1] Ending
[Client] Received 9
[Client] Got End
async-bidi-test:handleDoPureAsync service started on 127.0.0.1:50051
[Client] Sending 0
[Server2] Received 0
[Server2] Sending 0
[Client] Received 0
[Client] Sending 1
[Client] Sending 2
[Server1] Async iterator ended
[Server2] Received 1
[Server2] Sending 1
[Client] Received 1
[Client] Sending 3
[Server2] Received 2
[Server2] Sending 2
[Client] Received 2
[Client] Sending 4
[Server2] Received 3
[Server2] Sending 3
[Client] Received 3
[Client] Sending 5
[Server2] Received 4
[Server2] Sending 4
[Client] Received 4
[Server2] Received 5
[Server2] Sending 5
[Client] Sending 6
[Client] Received 5
[Client] Sending 7
[Server2] Received 6
[Server2] Sending 6
[Client] Received 6
[Server2] Received 7
[Server2] Sending 7
[Client] Sending 8
[Client] Received 7
[Server2] Received 8
[Server2] Sending 8
[Client] Received 8
[Client] Sending 9
[Client] Ending
[Server2] Received 9
[Server2] Sending 9
[Client] Received 9
[Server2] Async iterator ended
[Server2] Ending
[Client][ERROR] Error: 4 DEADLINE_EXCEEDED: Deadline exceeded
    at Object.callErrorFromStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/call.js:31:26)
    at Object.onReceiveStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/client.js:390:49)
    at Object.onReceiveStatus (/Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/client-interceptors.js:299:181)
    at /Users/keithbeckman/Documents/coursework/udemy-grpc/async-bidi-test/node_modules/@grpc/grpc-js/build/src/call-stream.js:145:78
    at processTicksAndRejections (internal/process/task_queues.js:77:11) {
  code: 4,
  details: 'Deadline exceeded',
  metadata: [Metadata]
}
[Client] Got End

Environment

Additional context

The Server1 instance uses the workaround, checking for the end of the source stream by listening to the end event, and ending the sink stream at that point. This successfully ends the stream for the client. The Server2 instance uses a pure async iterator implementation, which I would argue is a "least astonishment" implementation. After await returns from the async iterator, implying the source stream has ended, it calls call.end(), ending the sink stream. This ought to end the stream for the client, but instead the client does not end until much later, with DEADLINE_EXCEEDED.

The completion of an async iterator on a Readable stream should indicate the source stream has ended. At that point, if the Readable was a Duplex, calling the end() method should end the sink stream. Using the stream event interface alongside the async iterator should not be necessary.

As an aside, note that when pipelined like this the ending of Server1's async iterator occurs significantly AFTER Server2 has started, which happens within the server1 tryShutdown callback. Since the async iterator is occurring inside the async service method, it seems likely that tryShutdown is completing before it ought to; since this handler should have to complete before tryShutdown ends the server and calls its callback. I haven't investigated this in-depth, since I only discovered it while creating this repro; but perhaps it ought to be another separate bug report. I'm mentioning it here on the chance that it points to a flaw in async service method handling that could be causing the main bug.

murgatroid99 commented 3 years ago

It looks to me like the problem is that the end event does not cause the async iteration to end. It's hard to tell from that log, but my guess is that it is waiting for the close event, which wouldn't be emitted on a Duplex stream until some time after both sides have called the end method.

Regarding tryShutdown, the purpose of that method is to prevent new requests from starting and to wait for existing open requests to end. At the point when you call it, that request has ended, so it's OK for tryShutdown to finish quickly. The fact that there may also be some local asynchronous cleanup work doesn't really matter.

ttessarolo commented 1 year ago

stream.on("end") notify just the end of the readable part of the duplex stream, while the writable stream is paused (not the expected behaviour) . To force close even the writable stream is needed ta call stream.end() inside the stream.on("end") event. It's really ugly but it works.