hyperium / h2

HTTP 2.0 client & server implementation for Rust.
MIT License
1.38k stars 278 forks source link

Client connection is not closed after dropping SendRequest #502

Closed eaufavor closed 3 years ago

eaufavor commented 3 years ago

I noticed that the client connection is not closed after SendRequest is dropped after other references

Code to reproduce:

The code below mostly are copied from h2 docs: a server and a client.

If I removed delay_for(Duration::from_millis(0)).await;, the connection is closed.

use h2::server;
use http::{Response, StatusCode};
use tokio::net::TcpListener;

use h2::client;

use http::{Request, Method};
use std::error::Error;
use tokio::net::TcpStream;
use tokio::time::{delay_for, Duration};

async fn server() {
    let mut listener = TcpListener::bind("127.0.0.1:8888").await.unwrap();

    loop {
        if let Ok((socket, _peer_addr)) = listener.accept().await {
            tokio::spawn(async {
                let mut h2 = server::handshake(socket).await.unwrap();
                while let Some(request) = h2.accept().await {
                    let (_request, mut respond) = request.unwrap();

                    let response = Response::builder()
                        .status(StatusCode::OK)
                        .body(())
                        .unwrap();

                    respond.send_response(response, true)
                        .unwrap();
                }

            });
        }
    }
}

async fn client() -> Result<(), Box<dyn Error>> {
    let tcp = TcpStream::connect("127.0.0.1:8888").await?;
    let (h2, connection) = client::handshake(tcp).await?;

    tokio::spawn(async move {
        connection.await.unwrap();
        println!("Connection done");
    });

    let h2 = {
        let mut h2 = h2.ready().await?;
        let request = Request::builder()
                        .method(Method::GET)
                        .uri("https://www.example.com/")
                        .body(())
                        .unwrap();

        let (response, _) = h2.send_request(request, true).unwrap();

        let (_head, mut body) = response.await?.into_parts();
        let mut flow_control = body.flow_control().clone();

        while let Some(chunk) = body.data().await {
            let chunk = chunk?;
            let _ = flow_control.release_capacity(chunk.len());
        }
        h2
    };

    delay_for(Duration::from_millis(0)).await;

    Ok(())
}

#[tokio::main]
pub async fn main() {
    tokio::spawn(async {
        client().await.unwrap();
    });
    server().await;
}

Analysis

After some debug I noticed that the reason this happens is that the connection is not woken up when SendRequest is dropped. It is only woken up when OpaqueStreamRef is dropped:

https://github.com/hyperium/h2/blob/dc3079ab89ca9fa7b79e014f5b2a835f30f4916b/src/proto/streams/streams.rs#L1291-L1295

Without the sleep, SendRequest is dropped at the same time other references are dropped. A woken connection at a later time correctly closes the connections because all references are gone. With the sleep, when the connection is woken, there is still a reference to it, so it is not closed. Because nothing else will wake the connection, the connection is "leaked" and will be idle forever.

If I put similar wake code into the drop function of Streams the problem is fixed:

https://github.com/hyperium/h2/blob/dc3079ab89ca9fa7b79e014f5b2a835f30f4916b/src/proto/streams/streams.rs#L877-L884

impl<B, P> Drop for Streams<B, P>
where
    P: Peer,
{
    fn drop(&mut self) {
        let _ = self.inner.lock().map(|mut inner| {
            inner.refs -= 1;
            if inner.refs == 1 {
                if let Some(task) = inner.actions.task.take() {
                    task.wake();
                }
            }
        });
    }
}

I would like to hear from others whether this is the right fix or I'm just scratching the surface.

Thanks.

eaufavor commented 3 years ago

Thanks @nox !

nox commented 3 years ago

You're welcome, thanks for your report and for the candidate fix!