tower-rs / tower-grpc

A gRPC client & server implementation.
MIT License
564 stars 73 forks source link

There's no error when streaming and the client disconnects #55

Open bbigras opened 6 years ago

bbigras commented 6 years ago

In the following exemple, if I disconnect the client before sleep() is done, it will still try to send "test" without outputting any errors.

Also it would be nice, for long streams with infrequent messages, to have an error as soon as the connection is closed (even if we don't send anything on the wire).

#[derive(Clone, Debug)]
struct Greet;

impl server::TestCase for Greet {
    type SubStream = Box<Stream<Item = Notification, Error = tower_grpc::Error>>;
    type SubFuture = future::FutureResult<Response<Self::SubStream>, tower_grpc::Error>;

    fn sub(&mut self, request: Request<Empty>) -> Self::SubFuture {
        let (tx, rx) = mpsc::channel(4);

        thread::spawn(move || {
            thread::sleep(time::Duration::from_secs(10));

            tx.send(Notification {
                value: "test".to_string(),
            }).wait()
                .unwrap();
        });

        let rx = rx.map_err(|_| unimplemented!());
        future::ok(Response::new(Box::new(rx)))
    }
}
olix0r commented 6 years ago

@bbigras thanks for reporting this!

If I understand correctly, rx should be dropped when the client disconnects such that subsequent calls to tx.send fails (though i'm not sure this is guaranteed by mpsc::Sender as Sink).

The easiest way to test this would be to wrap your rx with a newtype that implements Stream and Drop, where the drop() implementation logs something.

Also, you might consider changing your tx.send to if let Err(e) = tx.try_send(..) { -- i would expect this to fail if the receiver has been dropped. You can check TrySendError::is_disconnected() to verify this is the reason for the failure.

bbigras commented 6 years ago

Thanks for the quick reply.

The send doesn't fail. The drop seems to happen after.

output

connected
sending
sending, done
Dropping!

code

struct MyType {
    rx: futures::sync::mpsc::Receiver<hello_world::Notification>,
}

impl Stream for MyType {
    type Item = hello_world::Notification;
    type Error = ();

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        self.rx.poll()
    }
}

impl Drop for MyType {
    fn drop(&mut self) {
        println!("Dropping!");
    }
}

#[derive(Clone, Debug)]
struct Greet;

impl server::TestCase for Greet {
    type SubStream = Box<Stream<Item = Notification, Error = tower_grpc::Error>>;
    type SubFuture = future::FutureResult<Response<Self::SubStream>, tower_grpc::Error>;

    fn sub(&mut self, request: Request<Empty>) -> Self::SubFuture {
        println!("connected");

        let (mut tx, rx) = mpsc::channel(4);

        thread::spawn(move || {
            thread::sleep(time::Duration::from_secs(10));

            println!("sending");
            if let Err(e) = tx.try_send(Notification {
                value: "test".to_string(),
            }) {
                println!("error: {}", e);
            }

            println!("sending, done");
        });

        let m = MyType { rx }.map_err(|_| unimplemented!());

        future::ok(Response::new(Box::new(m)))
    }
}
olix0r commented 6 years ago

Ok. I think I see what's going on; and I think this should be fixed...

Currently, the rx stream isn't dropped until the grpc server tries to write data on the closed stream. So at least one item has to be sent on tx for the server to detect the disconnect.

@carllerche and @seanmonstar seem to have some thoughts on how to fix this. The basic gist is that h2 has this information, but it isn't wired through the various body/stream types yet.

carllerche commented 6 years ago

@olix0r I vaguely remember discussing this but have lost the context. Is there an h2 issue or anything that has more of the context? (I probably should have written one up immediately, but alas I did not).

olix0r commented 6 years ago

@carllerche Here's what you said when we chatted about it:

basically, h2 needs to expose is_reset and then tower-grpc needs to drop it would check is_reset first and eagerly drop the stream

...

in theory, you would call poll_capacity first and that could error on reset so you wouldn’t need a separate poll_reset

...

but, we aren’t using poll_capacity so, we got dat infinite buffering ftw

This refers to the fact that Flush doesn't call poll_capacity yet.