hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.42k stars 1.59k forks source link

Silent failure on HTTP2 server reading stream with RST_STREAM(CANCEL/NO_ERROR) frame #3681

Open sirahd opened 3 months ago

sirahd commented 3 months ago

Version

hyper = { version = "1.1.0", features = ["full"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
futures-channel = { version = "0.3", features = ["sink"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "sink"] }
bytes = "1.6.0"
h2 = "0.4"
http = "1.1.0"

Platform

Darwin Kernel Version 23.4.0: Fri Mar 15 00:12:49 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6020 arm64

Description Silent failure on HTTP2 server reading request body when client sends RST_STREAM(CANCEL) on the stream containing request body.

In the example repro, the client is sending request body containing payload, followed by RST_STREAM(CANCEL) frame. Since the server is reading canceled stream, request.body_mut().collect().await.unwrap().to_bytes() is expected to trigger a panic via unwrap(). However, the function call returns the payload successfully without any indication of error.

After digging through the PR history, the changes to ignore CANCEL and NO_ERROR code on RST_STREAM was introduced in https://github.com/hyperium/hyper/pull/3275 in response to issue https://github.com/hyperium/hyper/issues/2872. While this may be an expected behavior, is there a way to read stream more gracefully on the server such that the error on RST_STREAM is propagated?

Repro https://github.com/sirahd/hyper-http2-rst-stream-cancel-repro

server code snippet:

use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;

use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::{Method, Request, Response};
use hyper::body::Bytes;
use hyper::rt::Executor;
use hyper::server::conn::http2;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;

async fn hello(mut request: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
    println!("receiving request {:?}", request);
    match *request.method() {
        Method::POST => {
            let data = request.body_mut().collect().await.unwrap().to_bytes();
            let transfer_bytes = data.len();

            println!("{:?}", transfer_bytes);
            Ok(Response::new(Full::new(Bytes::from(transfer_bytes.to_string()))))
        }
        _ => Ok(Response::new(Full::new(Bytes::new())))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let addr = SocketAddr::from(([127, 0, 0, 1], 4034));

    let listener = TcpListener::bind(addr).await?;

    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);

        tokio::task::spawn(async move {
            if let Err(err) = http2::Builder::new(TokioExecutor)
                .serve_connection(io, service_fn(hello))
                .await
            {
                eprintln!("Error serving connection: {:?}", err);
            }
        });
    }
}

#[derive(Clone)]
struct TokioExecutor;

impl<F> Executor<F> for TokioExecutor
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
{
    fn execute(&self, future: F) {
        tokio::spawn(future);
    }
}

client code snippet:

use std::error::Error;
use std::time::Duration;

use bytes::Bytes;
use h2::{client, Reason};
use http::{Method, Request};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let tcp = TcpStream::connect("127.0.0.1:4034").await?;
    let (h2, connection) = client::handshake(tcp).await?;
    tokio::spawn(async move {
        connection.await.unwrap();
    });
    let mut send_request = h2.ready().await?;

    let request = Request::get("http://127.0.0.1:4034")
        .method(Method::POST)
        .body(())
        .unwrap();

    let (response, mut send_stream) = send_request
        .send_request(request, false).unwrap();

    send_stream.send_data(Bytes::from(vec![5; 1000]), false)?;
    tokio::time::sleep(Duration::from_secs(5)).await;
    // sending RST_STREAM(CANCEL) here
    send_stream.send_reset(Reason::CANCEL);
    let response = response.await?;
    println!("{:?}", response);
    Ok(())
}