connectrpc / connect-go

The Go implementation of Connect: Protobuf RPC that works.
https://connectrpc.com
Apache License 2.0
2.97k stars 100 forks source link

ServerStreamForClient.Close hangs forever #789

Open amenzhinsky opened 1 week ago

amenzhinsky commented 1 week ago

Describe the bug

A client service needs to forcibly stop a server stream, but the Close method hangs indefinitely, preventing the connection-handling function from ever returning.

goroutine 82 [sync.Cond.Wait, 42 minutes]:
sync.runtime_notifyListWait(0xc046ba84c8, 0x65)
    runtime/sema.go:569 +0x159
sync.(*Cond).Wait(0x1a?)
    sync/cond.go:70 +0x85
golang.org/x/net/http2.(*pipe).Read(0xc046ba84b0, {0xc047066000, 0x2000, 0x2000})
    golang.org/x/net@v0.29.0/http2/pipe.go:76 +0xdf
golang.org/x/net/http2.transportResponseBody.Read({0x1?}, {0xc047066000?, 0x1?, 0x0?})
    golang.org/x/net@v0.29.0/http2/transport.go:2637 +0x65
io.(*LimitedReader).Read(0xc0435c5d40, {0xc047066000?, 0x10?, 0x1?})
    io/io.go:479 +0x43
io.discard.ReadFrom({}, {0x12b2d80, 0xc0435c5d40})
    io/io.go:666 +0x6d
io.copyBuffer({0x12b4620, 0x1b89ae0}, {0x12b2d80, 0xc0435c5d40}, {0x0, 0x0, 0x0})
    io/io.go:415 +0x151
io.Copy(...)
    io/io.go:388
connectrpc.com/connect.discard({0x7d06ab099df0, 0xc046ba8480})
    connectrpc.com/connect@v1.16.2/protocol.go:296 +0xc5
connectrpc.com/connect.(*duplexHTTPCall).CloseRead(0xc0435c0d20)
    connectrpc.com/connect@v1.16.2/duplex_http_call.go:241 +0x6d
connectrpc.com/connect.(*connectStreamingClientConn).CloseResponse(0xc046f9a770?)
    connectrpc.com/connect@v1.16.2/protocol_connect.go:647 +0x17
connectrpc.com/connect.(*errorTranslatingClientConn).CloseResponse(0xc0435c4c18)
    connectrpc.com/connect@v1.16.2/protocol.go:218 +0x23
connectrpc.com/otelconnect.(*streamingClientInterceptor).CloseResponse(0xc0439fff50)
    connectrpc.com/otelconnect@v0.7.1/payloadinterceptor.go:38 +0x23
connectrpc.com/connect.(*ServerStreamForClient[...]).Close(...)
    connectrpc.com/connect@v1.16.2/client_stream.go:170
[CUT]

To Reproduce

And in example_test.go:

package bugreport

func TestThatReproducesBug(t *testing.T) {
    stream, err := client.ServerStreamMethod(ctx, connect.NewRequest(...))
    if err != nil {
        t.Fatal(err)
    }
    defer stream.Close()

    for stream.Receive() {
        // trigger `defer`
        return
    }
}

Environment (please complete the following information):

connectrpc.com/connect v1.16.2
jhump commented 6 days ago

There are two improvements we could make here:

  1. The documentation could be improved that the Close method of a server stream (and CloseResponse method of a bidi stream) are graceful operations. They will not abort the RPC and attempt to consume the remainder of the server's response before returning. They are generally used to release any remaining resources associated with an RPC after the server's response has been fully consumed. If you want to abort the RPC before the server's response has been received, one must cancel the context.Context that was used to create the RPC. Cancelling an RPC this way will trigger a cancellation error in the server.
  2. We could improve the Close method to have better time and network I/O bounds. So if it takes longer than N duration or consumes more than X bytes, it would skip the graceful close and cancel the RPC. We'd need to agree on reasonable values of N and X and also decide whether to expose client options to customize these values.
anaxita commented 6 days ago

@jhump Hello, thank you for the fast answer! Can it be implemented by creating a new context with cancel inside rpc method call and cancel it on stream.Close?

jhump commented 6 days ago

Can it be implemented by creating a new context with cancel inside rpc method call and cancel it on stream.Close?

Yes.

anaxita commented 6 days ago

Can it be implemented by creating a new context with cancel inside rpc method call and cancel it on stream.Close?

Yes.

To be sure, I told about implementing that on connect lib side, and you?

jhump commented 6 days ago

It is the same either way. But in the connect lib side:

  1. Care must be taken to ensure that the cancel function is always called at the end of the RPC (even when successful), to avoid any resource leaks.
  2. The lib should first attempt to gracefully close by consuming the response up to some time limit and up to some limit of bytes-read, and only cancel the RPC as a last resort to avoid hanging.