hyperium / h2

HTTP 2.0 client & server implementation for Rust.
MIT License
1.38k stars 278 forks source link

Push_promise seems to be stuck #811

Open DjordjeNikolasevic opened 2 weeks ago

DjordjeNikolasevic commented 2 weeks ago

I am trying to implement a basic server push scenario where the server makes multiple push requests. The client is first expected to process pushed requests and only then await for the main response. What I noticed is that when I simulate a delay on server side between sending push requests and the main response, push_promise() seems to be stuck on client side after receiving all of the promises. It looks like receiving the main response does not wake up the waiter.

However, when I remove the delay or make the client firstly await for the main response, and only then call push_promise(), everything works fine.

Firstly, my question is - Is server push in h2 expected to be used this way? I was not able to find any documentation about this or at least a basic usage example. If yes, is this a bug or I am misusing the library somewhere?

Code example:

Client:

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    // Establish TCP connection to the server.
    let tcp = TcpStream::connect("127.0.0.1:5928").await?;
    let (h2, connection) = client::handshake(tcp).await?;
    tokio::spawn(async move {
        connection.await.unwrap();
    });

    let mut h2 = h2.ready().await?;
    // Prepare the HTTP request to send to the server.
    let request = Request::builder()
                    .method(Method::GET)
                    .header("test_key", "test_value")
                    .uri("https://www.example.com/")
                    .body(())
                    .expect("Request error");

    let (mut response, _) = h2.send_request(request.clone(), true).expect("Send request error");

    // Awaiting for response consumes the object, so we need to get the stream of promises first.
    let mut pushes = response.push_promises();

    while let Some(push) = pushes.push_promise().await {
        println!("Push {:?}", push);

        let push_promise = push.unwrap();
        let (request, pushed_response_future) = push_promise.into_parts();
        println!("Pushed request: {:?}", request);

        let (head_pushed, ref mut body_pushed) = pushed_response_future.await.unwrap().into_parts();
        println!("Pushed data_head: {:?}", head_pushed);

        while let Some(chunk) = body_pushed.data().await {
            println!("Pushed data {:?}", chunk);
        }
    }

    // We never get here, above while loop is stuck after the last promise was processed.
    println!("Finished receiving pushed promises!");

    let (head, mut body) = response.await?.into_parts();

    println!("Received response: {:?}", head);

    while let Some(chunk) = body.data().await {
        println!("Main response data {:?}", chunk);
    }

    Ok(())
}

Server:

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut listener = TcpListener::bind("127.0.0.1:5928").await?;

    println!("Listening on {:?}", listener.local_addr());

    loop {
        if let Ok((socket, _peer_addr)) = listener.accept().await {
            socket.set_nodelay(true)?; // Disable Nagle's algorithm
            tokio::spawn(async move {
                if let Err(e) = handle(socket).await {
                    println!("  -> err={:?}", e);
                }
            });
        }
    }
}

async fn handle(socket: TcpStream) -> Result<(), Box<dyn Error>> {
    let mut connection = server::handshake(socket).await?;
    println!("H2 connection bound");

    while let Some(result) = connection.accept().await {
        tokio::spawn(async move {
            let (request, mut respond) = result.unwrap();
            println!("GOT request: {:?}", request);

            for i in 0..50 {
                send_push(i, &mut respond);
            }

            // Simulate a delay.
            tokio::time::sleep(Duration::from_secs(5)).await;

            println!("Sending response");
            let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
            let mut send = respond.send_response(response, false).unwrap();
            send.send_data("hello world".into(), true).unwrap();
            println!("Sending response complete");
        });
    }

    println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~");

    Ok(())
}

fn send_push<B>(id: usize, send_response: &mut SendResponse<B>)
where
    B: bytes::Buf + From<Bytes> 
{
    let uri = format!("http://www.example.com/{:?}", id);
    let pushed_req = Request::builder()
        .uri(uri)
        .body(())
        .unwrap();

    let pushed_rsp = http::Response::builder().status(200).body(()).unwrap();

    let mut send_pushed = send_response
        .push_request(pushed_req)
        .unwrap()
        .send_response(pushed_rsp, false)
        .unwrap();

    send_pushed.send_data(Bytes::from("a").into(), false).unwrap();
    send_pushed.send_data(Bytes::from("b").into(), false).unwrap();
    send_pushed.send_data(Bytes::from("c").into(), true).unwrap();
}
seanmonstar commented 2 weeks ago

If you're using h2 directly, you need to handle flow control explicitly. https://docs.rs/h2/latest/h2/#flow-control

DjordjeNikolasevic commented 2 weeks ago

@seanmonstar Is this really a problem with control flow? I am sending just few bytes of data.

I managed to solve the problem by adding stream.notify_push() to Recv::recv_headers, just below stream.notify_recv(). It looks like the wake up for the poll_push_promise is missing from some paths.

Though I am not sure if this is supposed to be the wanted behavior, or the caller should explicitly wait for the main response before iterating through push promises.

seanmonstar commented 2 weeks ago

Oh, in this specific case, yea that could be.

(The flow control comment is still true, but perhaps not what you run into yet.)