connectrpc / connect-go

The Go implementation of Connect: Protobuf RPC that works.
https://connectrpc.com
Apache License 2.0
2.76k stars 91 forks source link

How to detect if client closed the stream for server streaming method? #668

Closed raymondsze closed 5 months ago

raymondsze commented 6 months ago

I have a method like this

syntax = "proto3";

package api.v1;

service ApiService {
  rpc PublishMessage(PublishMessageRequest) returns (stream PublishMessageResponse) {};
}

message PublishMessageRequest {
  google.protobuf.Any request = 1;
}

message PublishMessageResponse {
  google.protobuf.Any response = 1;
}

client.go

    ctx := context.Background()
    client := apiconnect.NewApiServiceClient(http.DefaultClient, "http://localhost:50051")
    stream, _ := client.PublishMessage(ctx, connect.NewRequest(&api.PublishMessageRequest{Request: request}))
    stream.Close()

server.go

func (s *ApiService) PublishMessage(ctx context.Context, in *connect.Request[api.PublishMessageRequest], stream *connect.ServerStream[api.PublishMessageResponse]) error {
    s.replyChan = make(chan *anypb.Any)
    HandleMessage(s.replyChan, in)
    for {
       select {
           case reply : <- s.replyChan
               stream.Send(&api.PublishMessageResponse{Response: reply.Response})
           // The following didn't trigger if client call stream.Close()....
           case <-ctx.Done():
           default:
                return nil
       }
    }
}

How to detect if client closed the stream for server streaming method?

emcfarlane commented 6 months ago

Hey @raymondsze, you will need to use context cancellation to propagate the cancellation. For your example change the background context:

    ctx, cancel := context.WithCancel(context.Background())
    client := apiconnect.NewApiServiceClient(http.DefaultClient, "http://localhost:50051")
    stream, _ := client.PublishMessage(ctx, connect.NewRequest(&api.PublishMessageRequest{Request: request}))
    cancel()

In this example stream.Close will try to gracefully close the connection and will wait on the handler closing. Will look at implementing better documentation for this behaviour.

jhump commented 6 months ago

How to detect if client closed the stream for server streaming method?

There are two ways. 1) The context provided to the handler will be cancelled if the client cancels or disconnects. 2) The call to stream.Send will return a non-nil error if the message cannot be written because the client has cancelled/disconnected.

The following didn't trigger if client call stream.Close()....

Calling Close will actually consume all messages in order to get to the final status of the RPC. In server-stream methods, the status/error comes at the end of the stream, which allows the server to indicate if the messages it sent were partial due to a failure that occurred mid-stream.

If you want to abort the call from the client, cancel the context like shown in @emcfarlane's reply.