hyperium / h2

HTTP 2.0 client & server implementation for Rust.
MIT License
1.34k stars 269 forks source link

Error while making a request hyper::Error(Http2, Error { kind: Reset(StreamId(1), PROTOCOL_ERROR, Remote) }) #647

Closed ETKNeil closed 1 year ago

ETKNeil commented 1 year ago

Hello, This works fine in http1 but in http2 it fails strangely.

Output

Writing from Server               @     
Flushing from Server
Writing from Client PRI * HTTP/2.0

SM

Polling read from Server
Running task 4971529d-2955-4b11-86b1-416298604676
Building request
Polling read from Client
Gonna send the request
Writing from Client                     @         O 
Flushing from Client
Running task cf8264ff-7013-4d99-b621-f376dd15db46
Polling read from Client
Writing from Client      �A�IP��
Flushing from Client
Polling read from Client
Reading               @     
Polling read from Server
Reading PRI * HTTP/2.0

SM

Polling read from Client
Writing from Client        
Flushing from Client
Polling read from Server
Writing from Server          
Flushing from Server
Polling read from Server
Polling read from Client
Reading                     @         O 
Reading          
Polling read from Server
Polling read from Client
Writing from Server        
Flushing from Client
Flushing from Server
Polling read from Client
Reading        
Polling read from Server
Reading      �A�IP��
Polling read from Client
Flushing from Client
Polling read from Server
Writing from Server          
Flushing from Server
Polling read from Server
Flushing from Server
Polling read from Server
Reading        
Polling read from Server
Polling read from Client
Flushing from Server
Reading          
Polling read from Client
Flushing from Client
Got output for task cf8264ff-7013-4d99-b621-f376dd15db46 : ()
Error while making a request hyper::Error(Http2, Error { kind: Reset(StreamId(1), PROTOCOL_ERROR, Remote) })
Polling read from Client
Flushing from Client
Writing from Client               
Flushing from Client
Shutdown from Client
Got output for task 4971529d-2955-4b11-86b1-416298604676 : ()
Polling read from Server
Reading               
Polling read from Server
Flushing from Server
Writing from Server              
Could not send, the underlying channel might be dropped (disconnected:true), got error sending on a closed channel
Flushing from Server
Shutdown from Server

Cargo.toml

[package]
name = "hyper-test"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread","macros","time"] }
hyper = { version = "1.0.0-rc.1", features = ["client", "http2","server","http1"] }
bytes = "1.2.1"
http-body-util = "0.1.0-rc.1"
flume = "0.10.14"
uuid = {version="1.2.2",features=["v4"]}

main.rs

use bytes::Bytes;
use http_body_util::{BodyExt, Empty, Full};
use std::fmt::Debug;
use std::future::Future;
use std::io::Error;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

pub(crate) struct TokioSleep {
    pub(crate) inner: Pin<Box<tokio::time::Sleep>>,
}

impl Default for TokioSleep {
    fn default() -> Self {
        Self {
            inner: Box::pin(tokio::time::sleep(Duration::default())),
        }
    }
}

impl Future for TokioSleep {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.inner.as_mut().poll(cx)
    }
}

impl hyper::rt::Sleep for TokioSleep {}

struct IO {
    name: String,
    rx: flume::Receiver<Vec<u8>>,
    tx: flume::Sender<Vec<u8>>,
    sleep: TokioSleep,
}

impl IO {
    pub fn new(name: String, rx: flume::Receiver<Vec<u8>>, tx: flume::Sender<Vec<u8>>) -> Self {
        let sleep = TokioSleep::default();
        Self {
            name,
            rx,
            tx,
            sleep,
        }
    }
}

use tokio::time::Instant;
use uuid::Uuid;

impl AsyncRead for IO {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let this = self.get_mut();
        println!("Polling read from {}", this.name);
        let mut sleep = Pin::new(&mut this.sleep);
        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep
                    .inner
                    .as_mut()
                    .reset(Instant::now() + Duration::from_millis(25));
                match this.rx.try_recv() {
                    Ok(msg) => {
                        buf.put_slice(msg.as_slice());
                        println!("Reading {}", String::from_utf8_lossy(buf.filled()));
                        Poll::Ready(Ok(()))
                    }
                    Err(_) => Poll::Pending,
                }
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl AsyncWrite for IO {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, Error>> {
        let len = buf.len();
        println!(
            "Writing from {} {}",
            &self.name,
            String::from_utf8_lossy(buf)
        );
        match self.tx.send(buf.to_vec()) {
            Err(err) => {
                eprintln!("Could not send, the underlying channel might be dropped (disconnected:{}), got error {}", self.tx.is_disconnected(), err);
            }
            _ => {}
        }
        Poll::Ready(Ok(len))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        println!("Flushing from {}", &self.name);
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        println!("Shutdown from {}", &self.name);
        Poll::Ready(Ok(()))
    }
}

#[derive(Clone, Debug)]
struct LocalExec {
    handle: tokio::runtime::Handle,
}

impl<F> hyper::rt::Executor<F> for LocalExec
where
    F: Future + 'static + Send,
    <F as Future>::Output: Debug,
    <F as Future>::Output: Send,
{
    fn execute(&self, fut: F) {
        let uuid = Uuid::new_v4();
        println!("Running task {}", uuid);
        self.handle.spawn(async move {
            let r = fut.await;
            println!("Got output for task {} : {:?}", uuid, r)
        });
    }
}

#[tokio::main(worker_threads = 2)]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let (server_tx, client_rx) = flume::unbounded();
    let (client_tx, server_rx) = flume::unbounded();
    let exec = LocalExec {
        handle: tokio::runtime::Handle::current().clone(),
    };

    let service =
        hyper::service::service_fn(|req: hyper::Request<hyper::body::Incoming>| async move {
            dbg!("called {:?}", req);
            let body: Full<&[u8]> = http_body_util::Full::from("hello".as_bytes());
            hyper::Response::builder().body(body)
        });

    let server_conn = hyper::server::conn::http2::Builder::new(exec.clone()).serve_connection(
        IO::new(String::from("Server"), server_rx, server_tx),
        service,
    );

    tokio::task::spawn(async move {
        if let Err(err) = server_conn.await {
            println!("Connection failed: {:?}", err);
        }
    });

    let (mut sender, client_conn) = hyper::client::conn::http2::Builder::new()
        .executor(exec.clone())
        .handshake(IO::new(String::from("Client"), client_rx, client_tx))
        .await
        .unwrap();

    tokio::task::spawn(async move {
        if let Err(err) = client_conn.await {
            println!("Connection failed: {:?}", err);
        }
    });

    let req = hyper::Request::builder()
        .uri("test")
        .body(Empty::<Bytes>::new())?;

    println!("Building request");
    tokio::task::spawn(async move {
        println!("Gonna send the request");
        match sender.send_request(req).await {
            Ok(mut res) => {
                println!("Response: {}", res.status());
                println!("Headers: {:#?}\n", res.headers());

                // Stream the body, writing each chunk to stdout as we get it
                // (instead of buffering and printing at the end).
                while let Some(next) = res.frame().await {
                    let frame = next.unwrap();
                    if let Some(chunk) = frame.data_ref() {
                        dbg!(chunk);
                    }
                }

                println!("\n\nDone!");
            }
            Err(err) => {
                eprintln!("Error while making a request {:?}", err);
            }
        }
    });
    tokio::time::sleep(Duration::from_secs(50)).await;
    Ok(())
}
ETKNeil commented 1 year ago

I got the same error with h2:

Cargo.toml

[package]
name = "temp"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time"] }
hyper = { version = "1.0.0-rc.1", features = ["client", "http2", "server", "http1"] }
bytes = "1.2.1"
http-body-util = "0.1.0-rc.1"
flume = "0.10.14"
uuid = { version = "1.2.2", features = ["v4"] }
h2 = { version = "0.3.15" }
env_logger = "0.9.3"

H2

use bytes::Bytes;
use hyper::{http, HeaderMap, Request};
use std::io::Error;

use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::time::Instant;
use uuid::Uuid;

#[derive(Clone, Debug)]
pub struct LocalExec {
    handle: tokio::runtime::Handle,
}

impl LocalExec {
    pub fn new(handle: tokio::runtime::Handle) -> Self {
        Self { handle }
    }
}

impl<F> hyper::rt::Executor<F> for LocalExec
    where
        F: Future + 'static + Send,
        <F as Future>::Output: Debug,
        <F as Future>::Output: Send,
{
    fn execute(&self, fut: F) {
        let uuid = Uuid::new_v4();
        println!("Running task {}", uuid);
        self.handle.spawn(async move {
            let r = fut.await;
            println!("Got output for task {} : {:?}", uuid, r)
        });
    }
}

pub(crate) struct TokioSleep {
    pub(crate) inner: Pin<Box<tokio::time::Sleep>>,
}

impl Default for TokioSleep {
    fn default() -> Self {
        Self {
            inner: Box::pin(tokio::time::sleep(Duration::default())),
        }
    }
}

impl Future for TokioSleep {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.inner.as_mut().poll(cx)
    }
}

impl hyper::rt::Sleep for TokioSleep {}

pub struct IO {
    name: String,
    rx: flume::Receiver<Vec<u8>>,
    tx: flume::Sender<Vec<u8>>,
    sleep: TokioSleep,
}

impl IO {
    pub fn new(name: String, rx: flume::Receiver<Vec<u8>>, tx: flume::Sender<Vec<u8>>) -> Self {
        let sleep = TokioSleep::default();
        Self {
            name,
            rx,
            tx,
            sleep,
        }
    }
}

impl AsyncRead for IO {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let this = self.get_mut();
        println!("Polling read from {}", this.name);
        let mut sleep = Pin::new(&mut this.sleep);
        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep
                    .inner
                    .as_mut()
                    .reset(Instant::now() + Duration::from_millis(25));
                match this.rx.try_recv() {
                    Ok(msg) => {
                        buf.put_slice(msg.as_slice());
                        println!("Reading {}", String::from_utf8_lossy(buf.filled()));
                        Poll::Ready(Ok(()))
                    }
                    Err(_) => Poll::Pending,
                }
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl AsyncWrite for IO {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, Error>> {
        let len = buf.len();
        println!(
            "Writing from {} {}",
            &self.name,
            String::from_utf8_lossy(buf)
        );
        match self.tx.send(buf.to_vec()) {
            Err(err) => {
                eprintln!("Could not send, the underlying channel might be dropped (disconnected:{}), got error {}", self.tx.is_disconnected(), err);
            }
            _ => {}
        }
        Poll::Ready(Ok(len))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        println!("Flushing from {}", &self.name);
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        println!("Shutdown from {}", &self.name);
        Poll::Ready(Ok(()))
    }
}

async fn handle_request(
    mut request: Request<h2::RecvStream>,
    mut respond: h2::server::SendResponse<Bytes>,
) -> Result<(), h2::Error> {
    println!("GOT request: {:?}", request);

    let body = request.body_mut();
    while let Some(data) = body.data().await {
        let data = data?;
        println!("<<<< recv {:?}", data);
        let _ = body.flow_control().release_capacity(data.len());
    }

    let response = http::Response::new(());
    let mut send = respond.send_response(response, false)?;
    println!(">>>> send");
    send.send_data(Bytes::from_static(b"hello "), false)?;
    send.send_data(Bytes::from_static(b"world\n"), true)?;

    Ok(())
}

#[tokio::main(worker_threads = 2)]
pub async fn main() -> Result<(), h2::Error> {
    let (server_tx, client_rx) = flume::unbounded();
    let (client_tx, server_rx) = flume::unbounded();

    let _ = env_logger::try_init();

    let io = IO::new(String::from("Server"), server_rx, server_tx);

    tokio::task::spawn(async move {
        let mut connection = h2::server::handshake(io).await.unwrap();
        while let Some(result) = connection.accept().await {
            let (request, respond) = result.unwrap();
            tokio::spawn(async move {
                if let Err(e) = handle_request(request, respond).await {
                    println!("error while handling request: {:?}", e);
                }
            });
        }
    });

    let io = IO::new(String::from("Client"), client_rx, client_tx);
    let (mut client, h2) = h2::client::handshake(io).await.unwrap();

    println!("sending request");

    let request = Request::builder().uri("test").body(()).unwrap();

    let mut trailers = HeaderMap::new();
    trailers.insert("zomg", "hello".parse().unwrap());

    let (response, mut stream) = client.send_request(request, false).unwrap();

    // send trailers
    stream.send_trailers(trailers).unwrap();

    // Spawn a task to run the conn...
    tokio::spawn(async move {
        if let Err(e) = h2.await {
            println!("GOT ERR={:?}", e);
        }
    });

    let response = response.await?;
    println!("GOT RESPONSE: {:?}", response);

    // Get the body
    let mut body = response.into_body();

    while let Some(chunk) = body.data().await {
        println!("GOT CHUNK = {:?}", chunk?);
    }

    if let Some(trailers) = body.trailers().await? {
        println!("GOT TRAILERS: {:?}", trailers);
    }

    Ok(())
}
ETKNeil commented 1 year ago

Ok figured it out, apparently an URI must be valid as per http2 recommendation and can not be just anything. I switched in both my example with a valid url instead of "test" and it works...