hyperium / h2

HTTP 2.0 client & server implementation for Rust.
MIT License
1.34k stars 269 forks source link

Server doesn't send flow control WINDOW_UPDATE frames #671

Closed nurmohammed840 closed 1 year ago

nurmohammed840 commented 1 year ago

I am trying to send 1MB of data, But server only received 65535 bytes,

Here is my setup:

Cargo.toml:

[package]
name = "h2-demo"
version = "0.1.0"
edition = "2021"

[dependencies]
bytes = "1"
h2 = "0.3.17"
http = "0.2"
tokio = { version = "1", features = ["io-util", "net", "rt-multi-thread", "macros"] }
tokio-rustls = "0.24"
rustls-pemfile = "1"

Server

use std::{str, sync::Arc};
use tokio::{io::Result, net::TcpListener};
use tokio_rustls::{rustls, TlsAcceptor};

mod load {
    use std::{
        fs::File,
        io::{BufReader, Result},
    };
    use tokio_rustls::rustls;

    pub fn certs(path: &str) -> Result<Vec<rustls::Certificate>> {
        let certs = rustls_pemfile::certs(&mut BufReader::new(File::open(path)?))?
            .into_iter()
            .map(rustls::Certificate)
            .collect();

        Ok(certs)
    }

    pub fn keys(path: &str) -> Result<Vec<rustls::PrivateKey>> {
        let keys = rustls_pemfile::pkcs8_private_keys(&mut BufReader::new(File::open(path)?))?
            .into_iter()
            .map(rustls::PrivateKey)
            .collect();

        Ok(keys)
    }
}

pub async fn run() -> Result<()> {
    let cert = load::certs("./cert/cert.pem")?;
    let mut keys = load::keys("./cert/key.pem")?;

    let mut tls_config = rustls::ServerConfig::builder()
        .with_safe_defaults()
        .with_no_client_auth()
        .with_single_cert(cert, keys.remove(0))
        .unwrap();

    tls_config.alpn_protocols = vec![Vec::from("h2")];
    tls_config.key_log = Arc::new(rustls::KeyLogFile::new());

    let tcp_listener = TcpListener::bind("127.0.0.1:4433").await?;
    let tls_acceptor = TlsAcceptor::from(Arc::new(tls_config));

    println!("Listening: https://127.0.0.1:4433");

    loop {
        let (stream, _addr) = tcp_listener.accept().await?;
        let Ok(tls_stream) = tls_acceptor.accept(stream).await else { continue };
        let Ok(mut conn) = h2::server::handshake(tls_stream).await else { continue };

        tokio::spawn(async move {
            'listener: while let Some(Ok((req, _res))) = conn.accept().await {
                let (head, mut body) = req.into_parts();
                println!("{} {:#?}", head.method, head.uri);

                if let Some(size) = head.headers.get("content-length") {
                    let Ok(Ok(size)) = str::from_utf8(size.as_bytes()).map(str::parse::<u32>) else { break 'listener };

                    println!("content-length: {:?}", size);
                    let mut total_len = 0;

                    while let Some(Ok(bytes)) = body.data().await {
                        total_len += bytes.len();
                        println!("\nrecv: {:?} bytes, total = {}", bytes.len(), total_len);

                        let fc = body.flow_control();
                        if let Err(_) = fc.release_capacity(bytes.len()) {
                            println!("Breaked!");
                            break 'listener;
                        }
                        println!(
                            "Used capacity: {}, Available capacity: {:?}",
                            fc.used_capacity(),
                            fc.available_capacity()
                        );
                    }
                }
            }
        });
    }
}

#[test]
fn test_name() -> std::io::Result<()> {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async { run().await })
}

Client

<h1>Hello World!</h1>
<script>
    function* gen_nums() {
        for (let i = 0; ; i++) {
            yield i + ", "
        }
    }
    function data(size) {
        let nums = "";
        for (const num of gen_nums()) {
            nums += num;
            if (nums.length >= size) {
                return nums.slice(0, size)
            }
        }
    }
    fetch("https://127.0.0.1:4433/rpc", {
        method: "POST",
        body: data(1 * 1024 * 1024)
    })
        .then(res => res.arrayBuffer())
        .then(data => {
            let msg = new TextDecoder().decode(data);
            console.log(msg);
        })
        .catch(console.error)
</script>

Output

Listening: https://127.0.0.1:4433
POST https://127.0.0.1:4433/rpc
content-length: 1048576

recv: 16375 bytes, total = 16375
Used capacity: 49160, Available capacity: 16375

recv: 16375 bytes, total = 32750
Used capacity: 32785, Available capacity: 32750

recv: 16375 bytes, total = 49125
Used capacity: 16410, Available capacity: 49125

recv: 16375 bytes, total = 65500
Used capacity: 35, Available capacity: 65500

recv: 35 bytes, total = 65535
Used capacity: 0, Available capacity: 65535
test test_name has been running for over 60 seconds
^C

Screenshot from 2023-04-14 03-21-11

We can see that, server didn't send WINDOW_UPDATE frames. What am I doing wrong ?

seanmonstar commented 1 year ago

The problem is that the connection is no longer being driven (polled), so connection frames won't be sent/received. You typically need to spawn a new task for each new request you accept.

nurmohammed840 commented 1 year ago

Ah! I see...

So the solution is to spawn a new task:

tokio::spawn(async move {
    let mut total_len = 0;
    while let Some(Ok(bytes)) = body.data().await {
        total_len += bytes.len();
        println!("\nrecv: {:?} bytes, total = {}", bytes.len(), total_len);
        let _ = body.flow_control().release_capacity(bytes.len());
    }
});