hyperium / h2

HTTP 2.0 client & server implementation for Rust.
MIT License
1.34k stars 269 forks source link

Server hangs when there is delay between headers and body #662

Closed sanjams2 closed 1 year ago

sanjams2 commented 1 year ago

Im not sure this is a bug but the behavior feels buggy so I thought I would cut an issue.

Here is the code in question:

use std::net::SocketAddr;
use std::time::Duration;
use bytes::Bytes;
use h2::{client, server};
use http::{Method, Request, Response, StatusCode};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

async fn run_server(addr: SocketAddr) {
    tracing::info!("[server] starting server on {}", addr);
    let listener = TcpListener::bind(addr).await.unwrap();
    while let Ok((socket, remote_addr)) = listener.accept().await {
        tracing::debug!("[server] received connection from {}", remote_addr);
        tokio::task::spawn(async move {
            let mut h2 = server::handshake(socket).await.unwrap();
            tracing::debug!("[server] h2 stream established with {}", remote_addr);
            while let Some(request) = h2.accept().await {
                let (mut request, mut respond) = request.unwrap();
                tracing::debug!("[server] received h2 request from {}: {:?} {} {}", remote_addr, request.version(), request.method(), request.uri());
                let body = request.body_mut();
                tracing::trace!("[server] (flow control) avail cap: {}, used cap: {}", body.flow_control().available_capacity(), body.flow_control().used_capacity());
                while let Some(body_chunk) = body.data().await { // <-- server hangs here
                    tracing::debug!("[server] received body chunk from {}", remote_addr);
                    match body_chunk {
                        Err(e) => {
                            tracing::error!("[server] error getting body chunk: {:?}", e);
                            break
                        },
                        Ok(body_bytes) => {
                            let req_body = String::from_utf8_lossy(&body_bytes);
                            tracing::debug!("[server] request body chunk:\n{}", req_body);
                            let _ = body.flow_control().release_capacity(body_bytes.len());
                        }
                    }
                }
                tracing::debug!("[server] sending response to {}", remote_addr);
                let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
                respond.send_response(response, true).unwrap();
                tracing::info!("[server] successfully responded to {}", remote_addr);
            }
            tracing::debug!("[server] connection with {} ended", remote_addr);
        });
    }
    tracing::info!("[server] shutting down");
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    // Start server
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    tokio::spawn(async move {
        run_server(addr).await
    });

    // Let server spin up
    sleep(Duration::from_secs(2)).await;

    // Client
    // Establish connection
    tracing::debug!("[client] starting up...");
    let addr = TcpStream::connect(addr).await.unwrap();
    let (h2, connection) = client::handshake(addr).await.unwrap();
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            tracing::error!("[client] error on connection: {:?}", e);
        }
    });
    let mut h2 = h2.ready().await.unwrap();
    // Start request
    tracing::info!("[client] sending request");
    let request = Request::builder()
        .method(Method::POST)
        .uri("http://127.0.0.1:3000")
        .body(())
        .unwrap();
    let (response, mut stream) = h2.send_request(request, false).unwrap();
    // Simulate delay
    sleep(Duration::from_millis(2)).await;
    // Send body
    tracing::debug!("[client] sending body");
    let req_body = Bytes::from("hello, server ".to_string().repeat(10));
    stream.reserve_capacity(req_body.len());
    tracing::trace!("[client] (flow control) stream capacity: {}", stream.capacity());
    stream.send_data(req_body, true).unwrap();
    // Process response
    tracing::debug!("[client] waiting for response...");
    let response = response.await.unwrap(); // response is never received because the server is hanging
    let (head, _body) = response.into_parts();
    tracing::info!("[client] received response: {:?}", head);
}

Here is what the code prints out:

$ RUST_LOG=example=trace cargo run --bin example
   Compiling h2-example v0.1.0 (/tmp/h2-example)
    Finished dev [unoptimized + debuginfo] target(s) in 1.64s
     Running `target/debug/example`
2023-02-21T20:55:52.516642Z  INFO example: [server] starting server on 127.0.0.1:3000
2023-02-21T20:55:54.518027Z DEBUG example: [client] starting up...
2023-02-21T20:55:54.518442Z DEBUG example: [server] received connection from 127.0.0.1:54441
2023-02-21T20:55:54.518634Z  INFO example: [client] sending request
2023-02-21T20:55:54.518704Z DEBUG example: [server] h2 stream established with 127.0.0.1:54441
2023-02-21T20:55:54.519564Z DEBUG example: [server] received h2 request from 127.0.0.1:54441: HTTP/2.0 POST http://127.0.0.1:3000/
2023-02-21T20:55:54.519599Z TRACE example: [server] (flow control) avail cap: 65535, used cap: 0
2023-02-21T20:55:54.529842Z DEBUG example: [client] sending body
2023-02-21T20:55:54.529906Z TRACE example: [client] (flow control) stream capacity: 140
2023-02-21T20:55:54.529951Z DEBUG example: [client] waiting for response...

and the process just hangs here

What I notice is that when there is any sort of delay between the send_request and send_data calls, the server will hang attempting to read the body. This is simulated in the above code via a sleep (see // Simulate delay). When the sleep is removed, the server is able to read the body of the request and continue on. The piece of the server code that hangs is the body.data().await (see // <-- server hangs here)

To ensure that the server was actually receiving the request data, I also performed a packet capture using tcpdump:

$ sudo tcpdump -i lo0 -nnn port 3000
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes
12:55:54.518167 IP 127.0.0.1.54441 > 127.0.0.1.3000: Flags [S], seq 308336552, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 3421395431 ecr 0,sackOK,eol], length 0
12:55:54.518270 IP 127.0.0.1.3000 > 127.0.0.1.54441: Flags [S.], seq 3293632737, ack 308336553, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 1756816442 ecr 3421395431,sackOK,eol], length 0
12:55:54.518284 IP 127.0.0.1.54441 > 127.0.0.1.3000: Flags [.], ack 1, win 6379, options [nop,nop,TS val 3421395431 ecr 1756816442], length 0
12:55:54.518293 IP 127.0.0.1.3000 > 127.0.0.1.54441: Flags [.], ack 1, win 6379, options [nop,nop,TS val 1756816442 ecr 3421395431], length 0
12:55:54.518372 IP 127.0.0.1.54441 > 127.0.0.1.3000: Flags [P.], seq 1:25, ack 1, win 6379, options [nop,nop,TS val 3421395431 ecr 1756816442], length 24
12:55:54.518403 IP 127.0.0.1.3000 > 127.0.0.1.54441: Flags [.], ack 25, win 6379, options [nop,nop,TS val 1756816442 ecr 3421395431], length 0
12:55:54.518652 IP 127.0.0.1.3000 > 127.0.0.1.54441: Flags [P.], seq 1:10, ack 25, win 6379, options [nop,nop,TS val 1756816442 ecr 3421395431], length 9
12:55:54.518678 IP 127.0.0.1.54441 > 127.0.0.1.3000: Flags [.], ack 10, win 6379, options [nop,nop,TS val 3421395431 ecr 1756816442], length 0
12:55:54.519084 IP 127.0.0.1.54441 > 127.0.0.1.3000: Flags [P.], seq 25:67, ack 10, win 6379, options [nop,nop,TS val 3421395432 ecr 1756816442], length 42
12:55:54.519108 IP 127.0.0.1.3000 > 127.0.0.1.54441: Flags [.], ack 67, win 6378, options [nop,nop,TS val 1756816443 ecr 3421395432], length 0
12:55:54.519515 IP 127.0.0.1.3000 > 127.0.0.1.54441: Flags [P.], seq 10:19, ack 67, win 6378, options [nop,nop,TS val 1756816443 ecr 3421395432], length 9
12:55:54.519538 IP 127.0.0.1.54441 > 127.0.0.1.3000: Flags [.], ack 19, win 6379, options [nop,nop,TS val 3421395432 ecr 1756816443], length 0
12:55:54.530072 IP 127.0.0.1.54441 > 127.0.0.1.3000: Flags [P.], seq 67:216, ack 19, win 6379, options [nop,nop,TS val 3421395443 ecr 1756816443], length 149
12:55:54.530096 IP 127.0.0.1.3000 > 127.0.0.1.54441: Flags [.], ack 216, win 6376, options [nop,nop,TS val 1756816454 ecr 3421395443], length 0

14 packets captured
120 packets received by filter
0 packets dropped by kernel

Perhaps there is something about this is how the server should be acting based on the HTTP/2 RFC (maybe due to receiving a HEADER packet with no data?), but this feels a bit strange. It seems like there could be a delay between the send_request and send_data calls for any number of reasons and therefore the case is one that the server should be able to handle?

I can include more trace logs, but have left it at this for brevity.

seanmonstar commented 1 year ago

The connection needs to be constantly polled to allow all connection-based and streams to make progress. It's best to spawn a new task for each request, since otherwise you awaiting data will mean it isn't awaiting the connection. (The h2.accept().await line).

sanjams2 commented 1 year ago

Thanks for the quick response @seanmonstar. That does seem to work. Thanks for the pointer.

For posterity, the updated code is:

use std::net::SocketAddr;
use std::time::Duration;
use bytes::Bytes;
use h2::{client, server};
use http::{Method, Request, Response, StatusCode};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

async fn run_server(addr: SocketAddr) {
    tracing::info!("[server] starting server on {}", addr);
    let listener = TcpListener::bind(addr).await.unwrap();
    while let Ok((socket, remote_addr)) = listener.accept().await {
        tracing::debug!("[server] received connection from {}", remote_addr);
        tokio::task::spawn(async move {
            let mut h2 = server::handshake(socket).await.unwrap();
            tracing::debug!("[server] h2 stream established with {}", remote_addr);
            while let Some(request) = h2.accept().await {
                tokio::spawn(async move {
                    let (mut request, mut respond) = request.unwrap();
                    tracing::debug!("[server] received h2 request from {}: {:?} {} {}", remote_addr, request.version(), request.method(), request.uri());
                    let body = request.body_mut();
                    tracing::trace!("[server] (flow control) avail cap: {}, used cap: {}", body.flow_control().available_capacity(), body.flow_control().used_capacity());
                    while let Some(body_chunk) = body.data().await {
                        tracing::debug!("[server] received body chunk from {}", remote_addr);
                        match body_chunk {
                            Err(e) => {
                                tracing::error!("[server] error getting body chunk: {:?}", e);
                                break
                            },
                            Ok(body_bytes) => {
                                let req_body = String::from_utf8_lossy(&body_bytes);
                                tracing::debug!("[server] request body chunk:\n{}", req_body);
                                let _ = body.flow_control().release_capacity(body_bytes.len());
                            }
                        }
                    }
                    tracing::debug!("[server] sending response to {}", remote_addr);
                    let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
                    respond.send_response(response, true).unwrap();
                    tracing::info!("[server] successfully responded to {}", remote_addr);
                });
            }
            tracing::debug!("[server] connection with {} ended", remote_addr);
        });
    }
    tracing::info!("[server] shutting down");
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    // Start server
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    tokio::spawn(async move {
        run_server(addr).await
    });

    // Let server spin up
    sleep(Duration::from_secs(2)).await;

    // Client
    // Establish connection
    tracing::debug!("[client] starting up...");
    let addr = TcpStream::connect(addr).await.unwrap();
    let (h2, connection) = client::handshake(addr).await.unwrap();
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            tracing::error!("[client] error on connection: {:?}", e);
        }
    });
    let mut h2 = h2.ready().await.unwrap();
    // Start request
    tracing::info!("[client] sending request");
    let request = Request::builder()
        .method(Method::POST)
        .uri("http://127.0.0.1:3000")
        .body(())
        .unwrap();
    let (response, mut stream) = h2.send_request(request, false).unwrap();
    // Simulate delay
    sleep(Duration::from_millis(2)).await;
    // Send body
    tracing::debug!("[client] sending body");
    let req_body = Bytes::from("hello, server ".to_string().repeat(10));
    stream.reserve_capacity(req_body.len());
    tracing::trace!("[client] (flow control) stream capacity: {}", stream.capacity());
    stream.send_data(req_body, true).unwrap();
    // Process response
    tracing::debug!("[client] waiting for response...");
    let response = response.await.unwrap();
    let (head, _body) = response.into_parts();
    tracing::info!("[client] received response: {:?}", head);
}

I will close this issue.