containerd / ttrpc-rust

Rust implementation of ttrpc (GRPC for low-memory environments)
Apache License 2.0
195 stars 45 forks source link

Detect streaming client disconnects #202

Open powturns opened 1 year ago

powturns commented 1 year ago

Description of problem

The async server does not detect if a client disconnects ungracefully from a streaming API. For example, let's say a client wishes to subscribe to updates streamed from the server. The connection is established, and the server starts streaming updates to the client. If the client dies, the server does not immediately realize this and continues to send events.

For example, adapting the async-stream-server file to only read the first message from the client, then echo back an increasing sequence number forever:

    async fn echo_stream(
        &self,
        _ctx: &::ttrpc::r#async::TtrpcContext,
        mut s: ::ttrpc::r#async::ServerStream<streaming::EchoPayload, streaming::EchoPayload>,
    ) -> ::ttrpc::Result<()> {
        let mut e = s.recv().await?.unwrap();
        loop {
            e.seq += 1;

            println!("Sleep 1 second to simulate waiting for an event");
            sleep(std::time::Duration::from_secs(1)).await;

            println!("sending: {}", e.seq);
            let result = s.send(&e).await;

            if result.is_err() {
                println!("error sending item!");
            }
        }
    }

With this client implementation:

async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
    let mut stream = cli.echo_stream(default_ctx()).await.unwrap();

    let echo = streaming::EchoPayload {
        seq: 0,
        msg: "Echo in a stream".to_string(),
        ..Default::default()
    };
    stream.send(&echo).await.unwrap();

    while let Ok(resp) = stream.recv().await {
        println!("{}", resp);
    }
}

Expected result

I'm not sure what would be best; some options are:

  1. Cancel the async future the service method is executing in (like non-streaming async does if the client dies)
  2. Provide some kind of token that can be selected on (like #180)
  3. Propagate the write error back to the service method (so errors can be detected when calling s.send(&e).await;). While I think this makes sense to do regardless, it's a lot more convenient if the client disconnect can be detected before trying to send something to it.

Actual result

Client output:

seq: 5 msg: "Echo in a stream"
seq: 6 msg: "Echo in a stream"
^C  <-- client killed

Server output:

... snip ....
Sleep 1 second to simulate waiting for an event
sending: 6
Sleep 1 second to simulate waiting for an event
sending: 7
Sleep 1 second to simulate waiting for an event
[00:00:19.969] (7ffb7073b640) ERROR  write_message got error: Socket("Broken pipe (os error 32)")
sending: 8
Sleep 1 second to simulate waiting for an event
[00:00:20.970] (7ffb7073b640) ERROR  write_message got error: Socket("Broken pipe (os error 32)")