hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.42k stars 1.59k forks source link

Server: Provide a closed connection notification #707

Closed gyscos closed 8 years ago

gyscos commented 8 years ago

Similar to what golang provides with CloseNotifier, when a request takes a while to process (or is just waiting/long-polling), it is often convenient to detect when the client closed the connection. I did not find a way to get this information with hyper (maybe I just missed it?).

seanmonstar commented 8 years ago

While hyper uses std::net, the blocking io is used. Without select, the only way to know a Connection was closed is to try to write to it (or read, which will block until keep alive notices it's dead).

So you can only either read or write on it to determine if it's closed.

On Tue, Dec 22, 2015, 1:42 PM Alexandre Bury notifications@github.com wrote:

Similar to what golang provides with CloseNotifier https://golang.org/pkg/net/http/#CloseNotifier, when a request takes a while to process (or is just waiting/long-polling), it is often convenient to detect when the client closed the connection. I did not find a way to get this information with hyper (maybe I just missed it?).

— Reply to this email directly or view it on GitHub https://github.com/hyperium/hyper/issues/707.

gyscos commented 8 years ago

I see. That's actually the way golang implement it, by spawning a thread (well, a goroutine) to try to read from the body, and notify when the call completes.

I guess channels in rust are not as widespread as in golang, making an idiomatic API for this trickier (though perhaps using std::sync::CondVar?...), and it'd also probably add the need for an Arc in there... :-/

gyscos commented 8 years ago

Also, I tried this simple program:

extern crate hyper;

use std::io::Read;

use hyper::Server;
use hyper::server::Request;
use hyper::server::Response;

fn main() {
    Server::http("127.0.0.1:3000").unwrap().handle(|mut req: Request, _: Response| {
        let mut b = [0];
        // Attempt to block until the client disconnects
        println!("{}", req.read(&mut b).unwrap_or(42));
        println!("{:?}", b);
    });
}

And them I simply run curl http://localhost:3000

Unfortunately, the read comand returns immediately here (it returns 0 and writes nothing to b), and does not wait for the client to cancel the connection.

Trying to write is not always an option, it would be really nice to have this working (and golang also uses a read attempt, so it should be possible?). Maybe hyper does something on the connection?

Edit: indeed, a Request.body is a SizedReader or an EmptyReader: request.rs. And those don't forward the request to the underlying connection most of the time: h1.rs.

seanmonstar commented 8 years ago

If the request has no body, then a read will return EOF.

gyscos commented 8 years ago

Yes, so in the current condition it doesn't look possible to detect cancellation of a simple GET request? The fact that it works in golang makes me believe it would work if reading on the underlying socket, but hyper does not expose that.

seanmonstar commented 8 years ago

That presents a different problem though, since std::net uses blocking IO. If hyper let you try to read on the socket after having read all the declared request body, the read would block until 1) the client sent more data, or 2) the client disconnects. If it's a GET request, the client does not expect to send any more data, so you would block yourself until the client timed out. No further code would execute in that thread.

You might be able to get something if you duplicated the socket, put it in another thread, and let it readblock there...

Either way, the move to async IO should help, since epoll gives notifications when the client hangs up.

gyscos commented 8 years ago

Exactly, the blocking read in another thread is what golang's standard library does to detect client disconnection. It also uses blocking IO, just like here.

Async IO is another solution to this problem, but I'm not sure it's ready just yet.

seanmonstar commented 8 years ago

The problem is you would need to be sure that no other data would be coming from the connection. Otherwise, the other thread would get it, instead of your main thread.

You could do this yourself, for now, anyways:

let mut tcp = req.downcast_ref::<hyper::net::HttpStream>().unwrap().0.try_clone().unwrap();
let (tx, rx) = std::sync::mspc::channel();
thread::spawn(move || {
    tx.send(tcp.read(&[0u8]))
});

// else where
match rx.try_recv() {
    Ok(Ok(0)) => eof(),
    Ok(Err(io_error)) => error(),
    Err(TryRecvError::Disconnected) => thread_is_dead(),
    Err(TryRecvError::Empty) => still_connected()
}
gyscos commented 8 years ago

Golang implements it by actually copying from the original socket to the one visible by the user (they use a io.Pipe for that) - it also means they keep reading until EOF is found. That way, the user still has access to the data in the body , and the library can detect when the socket is closed.

It should be doable using downcast_ref indeed, thanks. Now onto exposing this from iron...

nwtgck commented 4 years ago

@seanmonstar Hi! Can we detect client close in server side in the current version 0.13.x?

DCjanus commented 3 years ago

ping It's been 6 years, and with tokio::select and async/await, is there anything changed, and if someone want to help, I wish there would be some guide.

sharksforarms commented 1 year ago

Older issue, but found a solution which works for me so figured I'd share.

This solution involves implementing Accept ourselves, so that we can return our own ClientConnection , which we implement Drop on. The Drop implementation contains the signaling (over a CancellationToken in this case) to get a notification in the spawned task of the client connection drop.

tokio = { version = "1.28.2", features = ["full"] }
hyper = { version = "0.14.26", features = ["full"] }
tokio-util = "0.7.8"
futures-util = "0.3.28"
use hyper::{
    body::Bytes,
    server::accept::Accept,
    service::{make_service_fn, service_fn},
    Body, Request, Response, StatusCode,
};
use std::{
    convert::Infallible,
    net::SocketAddr,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};
use tokio::{
    io::{AsyncRead, AsyncWrite, ReadBuf},
    net::{TcpListener, TcpStream},
};
use tokio_util::sync::CancellationToken;

async fn handle(
    _req: Request<Body>,
    client_connection_cancel: CancellationToken,
) -> Result<Response<Body>, hyper::http::Error> {
    let (mut tx, rx) = Body::channel();

    // spawn background task, end when client connection is dropped
    tokio::spawn(async move {
        let mut counter = 0;
        loop {
            tokio::select! {
                _ = client_connection_cancel.cancelled() => {
                    println!("client connection is dropped, exiting loop");
                    break;
                },
                _ = tokio::time::sleep(Duration::from_secs(1)) => {
                    tx.send_data(Bytes::from(format!("{counter}\n"))).await.unwrap();
                    counter += 1;
                }
            }
        }
    });

    Response::builder().status(StatusCode::OK).body(rx)
}

struct ServerListener(TcpListener);

struct ClientConnection {
    conn: TcpStream,
    cancel: CancellationToken,
}

impl Drop for ClientConnection {
    fn drop(&mut self) {
        self.cancel.cancel()
    }
}

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 3000)))
        .await
        .unwrap();

    let make_service = make_service_fn(|conn: &ClientConnection| {
        let client_connection_cancel = conn.cancel.clone();
        async move {
            Ok::<_, Infallible>(service_fn(move |req| {
                handle(req, client_connection_cancel.clone())
            }))
        }
    });

    let server = hyper::server::Server::builder(ServerListener(listener)).serve(make_service);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

impl AsyncRead for ClientConnection {
    fn poll_read(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<tokio::io::Result<()>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_read(context, buf)
    }
}

impl AsyncWrite for ClientConnection {
    fn poll_write(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, tokio::io::Error>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_write(context, buf)
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
    ) -> Poll<Result<(), tokio::io::Error>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_flush(context)
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        context: &mut Context<'_>,
    ) -> Poll<Result<(), tokio::io::Error>> {
        Pin::new(&mut Pin::into_inner(self).conn).poll_shutdown(context)
    }
}

impl Accept for ServerListener {
    type Conn = ClientConnection;

    type Error = std::io::Error;

    fn poll_accept(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Result<Self::Conn, Self::Error>>> {
        let (conn, _addr) = futures_util::ready!(self.0.poll_accept(cx))?;
        Poll::Ready(Some(Ok(ClientConnection {
            conn,
            cancel: CancellationToken::new(),
        })))
    }
}
nwtgck commented 1 year ago

@sharksforarms Thank you so much for sharing! I'll try it.

edit: It worked!!:

https://github.com/hyperium/hyper/assets/10933561/d6a50a92-e862-45d8-ad17-7c125e5c7843