connectrpc / connect-go

The Go implementation of Connect: Protobuf RPC that works.
https://connectrpc.com
Apache License 2.0
2.76k stars 90 forks source link

Support context.Context in BidiStream's Receive() #735

Closed abourget closed 1 month ago

abourget commented 1 month ago

I'm using a BidiStream. I'm trying to use errgroup to handle reading messages in a goroutine, and doing some other stuff in another goroutine. However, I can't pass a context.Context to the Receive() call to interrupt it when the context cancels, so I can't get out of the errgroup.Group::Go() function, and so .. well I'm pretty stuck.

I'm decorating a context to add more cancellation conditions.

It seems context.Context is only half-way there in the BidiStream.

Describe the solution you'd like I'm not sure, but passing a context.Context to the Receive() method, and have it cancel with the rest, would be useful.

Describe alternatives you've considered Alternatives means complex cancellation flows, with channels, etc.. to end up returning a value from the streaming call.. not so pretty.

Perhaps there's a reason, or better ways to work around my issue?

Thanks!

jhump commented 1 month ago

@abourget, the context currently controls the lifetime of the HTTP operation as a whole, and not individual read and write calls. This very much mirrors the standard net/http libraries that Connect is built on: you can provide a context for the HTTP request, but not for individual calls to write to the request body or read from the response body.

The main difficulty with supporting a per-receive (or per-send) context is this: what is the state of the underlying RPC stream when it is cancelled? Would you expect that the entire operation is cancelled? If not, it could be in a corrupted state if a goroutine has partially read (or written) a message and then was interrupted via cancellation. Or it could have been fully read/written, but the caller has no knowledge of that because the cancellation occurred before the read value could be returned (or the successful write acknowledged). This puts the application in a strange state where it's unclear what it has received or sent on the stream. For many bidirectional RPCs, like those that use a client->server-then-server->client series of alternating messages, this could potentially lead to a deadlock.

You can handle this in your own application code, for the errgroup case, by simply cancelling the entire RPC context when the errgroup is cancelled. By cancelling the entire thing, the uncertainty above is not an issue -- the whole bidi conversation will be torn down on any error. You can do this by using context.AfterFunc:

rpcCtx, rpcCancel := context.WithCancel(ctx)
stream := stub.SomeBidiRPC(rpcCtx)

grp, grpCtx := errgroup.WithContext(rpcCtx)
context.AfterFunc(grpCtx, func() {
    // we preserve the stream if the whole group is successful,
    // so only cancel on error
    if grpCtx.Err() != nil {
        rpcCancel()
    }
})
// launch the group activities
grp.Go(func() error {
    // ....
})
if err := grp.Wait(); err != nil {
    return err
}

So if one of the errgroup's goroutines is stuck, waiting to read or write a message, and the group gets cancelled, this will cause that read or write to also cancel (since the entire RPC will be cancelled).

abourget commented 1 month ago

Ok, thanks for the detailed input! Have a good day!