grpc / grpc-node

gRPC for Node.js
https://grpc.io
Apache License 2.0
4.45k stars 645 forks source link

Server-side of an ServerStream-Request hangs with "rstCode 8", if client suddenly vanishes. #2385

Open mrx8 opened 1 year ago

mrx8 commented 1 year ago

Problem description

I have a ServerStream-service which generates a readable-stream of about 100.000 messages and set it in ctx.res. The client consumes only the first message and then process.exit(). This triggers a cancel-event on server-side on ctx.call, but the stream in ctx.res hangs forever.

Reproduction steps

create a simple message with one item. Create an async generator which yields a never ending amount of this messages. Make a stream from it with stream.Readable.from() and put that in ctx.res. On client side make the call and consume the resulting stream with for await and process.exit() after you have successfully received the first message.

Environment

Additional context

Log on server side:

D 2023-03-07T17:05:31.859Z | index | Loading @grpc/grpc-js version 1.8.10
D 2023-03-07T17:05:32.195Z | server | (1) Server constructed
D 2023-03-07T17:05:32.196Z | dns_resolver | Resolver constructed for target dns:0.0.0.0:10011
D 2023-03-07T17:05:32.196Z | dns_resolver | Returning IP address for target dns:0.0.0.0:10011
D 2023-03-07T17:05:32.198Z | server | (1) Attempting to bind 0.0.0.0:10011
D 2023-03-07T17:05:32.200Z | server | (1) Successfully bound 0.0.0.0:10011
D 2023-03-07T17:05:34.190Z | server | (1) Received call to method /xxx at address null
D 2023-03-07T17:05:34.191Z | server_call | Request to /xxx received headers {"authorization":["Bearer abc"],"grpc-accept-encoding":["identity,deflate,gzip"],"accept-encoding":["identity"],"user-agent":["grpc-node-js/1.8.7"],"content-type":["application/grpc"],"te":["trailers"]}
D 2023-03-07T17:05:34.712Z | server_call | Request to method /xxx stream closed with rstCode 8

log on client-side:

D 2023-03-07T17:05:34.070Z | index | Loading @grpc/grpc-js version 1.8.7
D 2023-03-07T17:05:34.173Z | resolving_load_balancer | dns:localhost:10011 IDLE -> IDLE
D 2023-03-07T17:05:34.173Z | connectivity_state | (1) dns:localhost:10011 IDLE -> IDLE
D 2023-03-07T17:05:34.173Z | dns_resolver | Resolver constructed for target dns:localhost:10011
D 2023-03-07T17:05:34.174Z | channel | (1) dns:localhost:10011 Channel constructed with options {
  "grpc.service_config": "{\"loadBalancingConfig\":[{\"round_robin\":{}}]}"
}
D 2023-03-07T17:05:34.174Z | channel_stacktrace | (1) Channel constructed 
    at new InternalChannel (/xyz/node_modules/@grpc/grpc-js/build/src/internal-channel.js:196:23)
    at new ChannelImplementation (/xyz/node_modules/@grpc/grpc-js/build/src/channel.js:35:32)
    at new Client (/xyz/node_modules/@grpc/grpc-js/build/src/client.js:65:36)
    at new ServiceClientImpl (/xyz/node_modules/@grpc/grpc-js/build/src/make-client.js:58:5)
    at new GrpcClient (/xyz/node_modules/@vwd/grpc-client/src/index.js:109:19)
    at main (/xyz/test-streaming.js:15:28)
    at Object.<anonymous> (/xyz/test-streaming.js:108:1)
    at Module._compile (node:internal/modules/cjs/loader:1226:14)
    at Module._extensions..js (node:internal/modules/cjs/loader:1280:10)
    at Module.load (node:internal/modules/cjs/loader:1089:32)
    at Module._load (node:internal/modules/cjs/loader:930:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:81:12)
    at node:internal/main/run_main_module:23:47
D 2023-03-07T17:05:34.175Z | channel | (1) dns:localhost:10011 createResolvingCall [0] method="/xxx", deadline=Infinity
D 2023-03-07T17:05:34.176Z | resolving_call | [0] Created
D 2023-03-07T17:05:34.176Z | resolving_call | [0] Deadline: Infinity
D 2023-03-07T17:05:34.176Z | resolving_call | [0] start called
D 2023-03-07T17:05:34.176Z | dns_resolver | Looking up DNS hostname localhost
D 2023-03-07T17:05:34.177Z | resolving_load_balancer | dns:localhost:10011 IDLE -> CONNECTING
D 2023-03-07T17:05:34.177Z | connectivity_state | (1) dns:localhost:10011 IDLE -> CONNECTING
D 2023-03-07T17:05:34.177Z | channel | (1) dns:localhost:10011 callRefTimer.ref | configSelectionQueue.length=1 pickQueue.length=0
D 2023-03-07T17:05:34.179Z | resolving_call | [0] write() called with message of length 401
D 2023-03-07T17:05:34.179Z | resolving_call | [0] halfClose called
D 2023-03-07T17:05:34.181Z | resolving_call | [0] startRead called
D 2023-03-07T17:05:34.182Z | dns_resolver | Resolved addresses for target dns:localhost:10011: [::1:10011,127.0.0.1:10011]
D 2023-03-07T17:05:34.182Z | round_robin | Connect to address list ::1:10011,127.0.0.1:10011
D 2023-03-07T17:05:34.182Z | subchannel | (2) ::1:10011 Subchannel constructed with options {
  "grpc.service_config": "{\"loadBalancingConfig\":[{\"round_robin\":{}}]}"
}
D 2023-03-07T17:05:34.182Z | subchannel_refcount | (2) ::1:10011 refcount 0 -> 1
D 2023-03-07T17:05:34.183Z | subchannel | (3) 127.0.0.1:10011 Subchannel constructed with options {
  "grpc.service_config": "{\"loadBalancingConfig\":[{\"round_robin\":{}}]}"
}
D 2023-03-07T17:05:34.183Z | subchannel_refcount | (3) 127.0.0.1:10011 refcount 0 -> 1
D 2023-03-07T17:05:34.183Z | subchannel_refcount | (2) ::1:10011 refcount 1 -> 2
D 2023-03-07T17:05:34.183Z | subchannel | (2) ::1:10011 IDLE -> CONNECTING
D 2023-03-07T17:05:34.183Z | round_robin | IDLE -> CONNECTING
D 2023-03-07T17:05:34.183Z | resolving_load_balancer | dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.183Z | channel | (1) dns:localhost:10011 callRefTimer.unref | configSelectionQueue.length=1 pickQueue.length=0
D 2023-03-07T17:05:34.183Z | connectivity_state | (1) dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.183Z | subchannel_refcount | (3) 127.0.0.1:10011 refcount 1 -> 2
D 2023-03-07T17:05:34.183Z | subchannel | (3) 127.0.0.1:10011 IDLE -> CONNECTING
D 2023-03-07T17:05:34.183Z | round_robin | CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.183Z | resolving_load_balancer | dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.183Z | connectivity_state | (1) dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.183Z | round_robin | CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.183Z | resolving_load_balancer | dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.183Z | connectivity_state | (1) dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.185Z | channel | (1) dns:localhost:10011 createRetryingCall [1] method="/xxx"
D 2023-03-07T17:05:34.185Z | retrying_call | [1] start called
D 2023-03-07T17:05:34.185Z | channel | (1) dns:localhost:10011 createLoadBalancingCall [2] method="/xxx"
D 2023-03-07T17:05:34.185Z | retrying_call | [1] Created child call [2] for attempt 1
D 2023-03-07T17:05:34.185Z | load_balancing_call | [2] start called
D 2023-03-07T17:05:34.185Z | load_balancing_call | [2] Pick result: QUEUE subchannel: null status: undefined undefined
D 2023-03-07T17:05:34.185Z | channel | (1) dns:localhost:10011 callRefTimer.ref | configSelectionQueue.length=0 pickQueue.length=1
D 2023-03-07T17:05:34.185Z | retrying_call | [1] startRead called
D 2023-03-07T17:05:34.185Z | load_balancing_call | [2] startRead called
D 2023-03-07T17:05:34.186Z | retrying_call | [1] write() called with message of length 406
D 2023-03-07T17:05:34.186Z | load_balancing_call | [2] write() called with message of length 406
D 2023-03-07T17:05:34.186Z | retrying_call | [1] halfClose called
D 2023-03-07T17:05:34.187Z | subchannel | (2) ::1:10011 CONNECTING -> TRANSIENT_FAILURE
D 2023-03-07T17:05:34.187Z | round_robin | CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.187Z | resolving_load_balancer | dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.187Z | channel | (1) dns:localhost:10011 callRefTimer.unref | configSelectionQueue.length=0 pickQueue.length=0
D 2023-03-07T17:05:34.187Z | load_balancing_call | [2] Pick result: QUEUE subchannel: null status: undefined undefined
D 2023-03-07T17:05:34.187Z | channel | (1) dns:localhost:10011 callRefTimer.ref | configSelectionQueue.length=0 pickQueue.length=1
D 2023-03-07T17:05:34.187Z | connectivity_state | (1) dns:localhost:10011 CONNECTING -> CONNECTING
D 2023-03-07T17:05:34.188Z | subchannel | (3) 127.0.0.1:10011 CONNECTING -> READY
D 2023-03-07T17:05:34.188Z | round_robin | CONNECTING -> READY
D 2023-03-07T17:05:34.188Z | resolving_load_balancer | dns:localhost:10011 CONNECTING -> READY
D 2023-03-07T17:05:34.188Z | channel | (1) dns:localhost:10011 callRefTimer.unref | configSelectionQueue.length=0 pickQueue.length=0
D 2023-03-07T17:05:34.188Z | load_balancing_call | [2] Pick result: COMPLETE subchannel: (3) 127.0.0.1:10011 status: undefined undefined
D 2023-03-07T17:05:34.188Z | connectivity_state | (1) dns:localhost:10011 CONNECTING -> READY
D 2023-03-07T17:05:34.189Z | transport_flowctrl | (4) 127.0.0.1:10011 local window size: 65535 remote window size: 65535
D 2023-03-07T17:05:34.189Z | transport_internals | (4) 127.0.0.1:10011 session.closed=false session.destroyed=false session.socket.destroyed=false
D 2023-03-07T17:05:34.189Z | load_balancing_call | [2] Created child call [3]
D 2023-03-07T17:05:34.189Z | subchannel_call | [3] write() called with message of length 406
D 2023-03-07T17:05:34.189Z | subchannel_call | [3] sending data chunk of length 406
D 2023-03-07T17:05:34.190Z | load_balancing_call | [2] halfClose called
D 2023-03-07T17:05:34.190Z | subchannel_call | [3] end() called
D 2023-03-07T17:05:34.190Z | subchannel_call | [3] calling end() on HTTP/2 stream
D 2023-03-07T17:05:34.191Z | transport | (4) 127.0.0.1:10011 local settings acknowledged by remote: {"headerTableSize":4096,"enablePush":true,"initialWindowSize":65535,"maxFrameSize":16384,"maxConcurrentStreams":4294967295,"maxHeaderListSize":4294967295,"maxHeaderSize":4294967295,"enableConnectProtocol":false}
D 2023-03-07T17:05:34.659Z | subchannel_call | [3] Received server headers:
        :status: 200
        content-type: application/grpc+proto
        grpc-accept-encoding: identity,deflate,gzip
        grpc-encoding: identity
        date: Tue, 07 Mar 2023 17:05:34 GMT

D 2023-03-07T17:05:34.659Z | load_balancing_call | [2] Received metadata
D 2023-03-07T17:05:34.659Z | retrying_call | [1] Received metadata from child [2]
D 2023-03-07T17:05:34.659Z | retrying_call | [1] Committing call [2] at index 0
D 2023-03-07T17:05:34.660Z | subchannel_call | [3] receive HTTP/2 data frame of length 14745
D 2023-03-07T17:05:34.660Z | subchannel_call | [3] parsed message of length 14745
D 2023-03-07T17:05:34.660Z | subchannel_call | [3] pushing to reader message of length 14745
D 2023-03-07T17:05:34.660Z | load_balancing_call | [2] Received message
D 2023-03-07T17:05:34.660Z | retrying_call | [1] Received message from child [2]
D 2023-03-07T17:05:34.667Z | resolving_call | [0] startRead called
D 2023-03-07T17:05:34.667Z | retrying_call | [1] startRead called
D 2023-03-07T17:05:34.667Z | load_balancing_call | [2] startRead called
murgatroid99 commented 1 year ago

Can you please share your test code?

mrx8 commented 1 year ago

My code is at: https://github.com/mrx8/grpc-node-issue-2385 please do npm install and afterwards start the server with node server.js and then node call-stream-hello.js. What does the code: Ther server produces a lot of messages. The client consumes only the first 10 messages, and then terminates prematurely. My expectations: The server should end the stream. It receives an "cancelled"-event, but the Readable-stream on server side is pushing messages further until the buffer is full and then hangs...

I expect that the following line is executed: https://github.com/mrx8/grpc-node-issue-2385/blob/main/server.js#L35 This line is executed if all goes well (the client consumes all messages without an error).

murgatroid99 commented 1 year ago

I ran your test code. I can run the client script repeatedly and each one will get a successful response. Since the server is still responsive to requests, I would not say that it "hangs".

The "cancelled" event is there to inform the server application that it should stop processing that request. I suggest stopping the stream that is producing responses when you see that event.

mrx8 commented 1 year ago

With "hang" I meant the stream hangs. It will never be successfully be finished. However I noticed that at least it is obviously no memory-leak (I tested it for an hour with an endless loop).

If this is the clear intention how it should work than I will have to wrap every readable-stream with a cancel mechanism before I use it with grpcjs and even then I cannot motivate my generators to execute code after their for-await-loop.

It would be way easier for developers if this client/server/bidi-streams were individual streams with their inherent semantics. So we could react on the usual events of streams. But anyhow, if it works as designed, than I have to find a solution for myself.

mrx8 commented 1 year ago

I noticed, that when breaking/return a for await-loop over a grpc-stream on client-side, the server does not get notified. I can integrate stream.cancel() for myself, ok, but I have customers, over which I don't have control. If they forget to call cancel() in the right places, this would hamper the server. I personally thought it was implemented as stream over a stream. The latter one with "autoDestroy: false". It feels a bit alien to me with this handling of premature-end-scenarios :)

murgatroid99 commented 1 year ago

The intention in the gRPC API is that cancellation is for exceptional situations. In a server streaming call, the initial request should indicate what messages the client wants the server to send. If you need the client to be able to decide what messages it wants after it has started processing the stream, the right way to do that is with a bidirectional stream.

With that being said, I may be able to improve the client-side behavior here. If you can share the code that demonstrates "when breaking/return a for await-loop over a grpc-stream on client-side, the server does not get notified", then I can take a look.