tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
27.2k stars 2.5k forks source link

Waiting for HUP on TCP socket #3467

Open benesch opened 3 years ago

benesch commented 3 years ago

This is a resubmission of #483 for the Tokio v1.0 era. The question is thus: "how do I wait for a TCP socket to close without reading from or writing to that socket?"

First, some quick history! #483 was resolved with this suggestion:

You can ask to get notified on HUP by calling poll_read_ready and passing Ready::hup() as the interest.

But poll_read_ready was later removed, so 6d8cc4e was filed asking about how to wait for HUP without poll_read_ready. The proposed solution was to restore poll_read_ready. That method is now restored (#2968), but it's no longer clear to me how one uses it to wait for just HUP. In Tokio v1.0, there is no Ready::hup().

Here's the comment I posted on #2968, which contains my current hacky attempt to wait for HUP, and my ideal API:

I managed to get something working here, but it's suboptimal: MaterializeInc/materialize#5235

The tl;dr is what you have to write currently is something like this, which polls Interest::READABLE and then inspects whether the resulting Ready indicates is_read_closed:

fn write_stream(conn: TcpStream, stream: &dyn Stream<Item = Vec<u8>>) -> Result<(), &'static str> {
    loop {
        match time::timeout(Duration::from_secs(1), stream.next()).await {
            Ok(None) => return Ok(()),
            Ok(Some(data)) => conn.write_all(data).await,
            Err(_) => {
                let ready = conn.ready(Interest::READABLE).await?;
                if ready.is_read_closed() {
                    return Err("conn broke");
                }
            }
        }
    }
}

But ideally you'd be able to block on the READ_CLOSED event directly, or something:

fn write_stream(conn: TcpStream, stream: &dyn Stream<Item = Vec<u8>>) -> Result<(), &'static str> {
    loop {
        select! {
            msg = stream.next() => match msg {
                None => return Ok(()),
                Some(data) => conn.write_all(data).await,
            }
            _ = conn.ready(Interest::READ_CLOSED) => return Err("conn broke"),
        }
    }
}
Darksonn commented 3 years ago

The Tokio type internally uses mio::Interest, and it doesn't seem to have a READ_CLOSED option either, so this would have to be added to mio first.

@Thomasdezeeuw Does the various OSes even provide such an interest?

vi commented 3 years ago

This related discussion shows that it seems to be impossible to build a HUP-propagating TCP forwarder with just Tokio (but possible lower-level and less portable nix approach instead).

Thomasdezeeuw commented 3 years ago

The Tokio type internally uses mio::Interest, and it doesn't seem to have a READ_CLOSED option either, so this would have to be added to mio first.

@Thomasdezeeuw Does the various OSes even provide such an interest?

Sorry I never received this notification, or I missed the email.

As far as Interest goes error, read-closed and write-closed are always tracked. If the OS supports it of course, see Event::is_read_closed and related methods.

I've also answered here https://github.com/tokio-rs/mio/issues/1476, but Tokio should be getting Event::is_read_closed/Event::is_write_closed events. I don't know how/if Tokio exposes this somehow.

Thomasdezeeuw commented 3 years ago

@benesch looking at your example I have a question; do you read from the connection? Because if so the easiest way to detect a broken/closed connection is by reading and getting zero bytes. This indicates the connection is closed, or the other side called shutdown (which has the same intention).

benesch commented 3 years ago

@benesch looking at your example I have a question; do you read from the connection? Because if so the easiest way to detect a broken/closed connection is by reading and getting zero bytes. This indicates the connection is closed, or the other side called shutdown (which has the same intention).

Indeed, it'd be swell if we could read from the connection to determine if the client had gone away. But the protocol we're implementing (the PostgreSQL COPY TO protocol) doesn't really permit that. The server streams data to the client until it runs out of data; only at that point does the server read from the client to figure out what to do next. But the client is welcome to tell the server what it wants the server to do next before the COPY command completes. So if we read from the connection to see if the client has gone away while streaming COPY results to the client, we might consume bytes that we didn't mean to. In theory we could stash those bytes in a buffer somewhere, but it gets really hairy with Tokio codecs.

benesch commented 3 years ago

I'm not sure how much sense that last comment made. Let me try again with code. You might hope to be able to write:

fn write_stream(conn: TcpStream, stream: &dyn Stream<Item = Vec<u8>>) -> Result<(), &'static str> {
    loop {
        select! {
            msg = stream.next() => match msg {
                None => return Ok(()),
                Some(data) => conn.write_all(data).await,
            }
            res = conn.read(&mut [0; 1]) => match res {
                Ok(1) => {
                    // Gotta stick this byte somewhere... plus if we saw one
                    // byte we're probably going to see a lot more, but that
                    // doesn't mean we can stop sending the data from `stream`
                    // to the client.
                }
                Ok(0) => {
                    // Client went away. Give up.
                    return Ok(())
                }
                Err(e) => return Err("conn broke"),
            }
        }
    }
}

The problem, as described in the comment within, is what happens when you read and get actual bytes back. If you get a short read you're golden because you know the client has gone away, and you can give up. But if you see bytes... well, you gotta buffer those bytes somewhere. And ideally you'd leave them in the kernel buffer for later, because that way eventually you'll apply some backpressure on the client that's writing a lot of bytes that you don't want to see yet.

vi commented 3 years ago

the easiest way to detect a broken/closed connection is by reading and getting zero bytes

  1. Clients may shutdown(2) request-sending part of the socket and still wait for response to come. So that receiving the proof that there will be no more requests from that socket is not a proof that it is useless to prepare a response for that client;
  2. We may want to avoid recv(2)ing anything from that client now to provide backpressure.
vi commented 3 years ago

If you get a short read you're golden because you know the client has gone away, and you can give up.

@benesch, The socket may be half-closed: the client had finished writing to it and called shutdown(SHUT_WR), but is still waiting for your reply. You can write to that socket, but can't read from it anymore.

benesch commented 3 years ago

If you get a short read you're golden because you know the client has gone away, and you can give up.

@benesch, The socket may be half-closed: the client had finished writing to it and called shutdown(SHUT_WR), but is still waiting for your reply. You can write to that socket, but can't read from it anymore.

That's not a thing that our clients do! But agree that it would be useful to support for other folks that do use half closed sockets.

Darksonn commented 3 years ago

Aha. Well if it is possible to detect it, it would be good to provide a way to do so.

Millione commented 6 months ago

Any progress here?

Darksonn commented 6 months ago

We added something called error interest, which you can wait for. Not sure if it is helpful in this case.

vi commented 6 months ago

Can that error interest be used to implement something like tokio::net::TcpStream::into_split, but that would additionally return a special error monitor handle that does not participate in byte IO, yet does receive resets?

andrewhavck commented 5 months ago

I’ve found waiting on priority interest works if you’re using Linux.