aws / s2n-quic

An implementation of the IETF QUIC protocol
https://crates.io/crates/s2n-quic
Apache License 2.0
1.11k stars 115 forks source link

Traffic stops after many streams on Windows #2220

Open XeCycle opened 1 month ago

XeCycle commented 1 month ago

Problem:

An echo server, with a simple client that repeatedly opens some streams concurrently, sends data and reads back, always hangs after some time on Windows, but works fine on Linux.

The server:

use std::{net::SocketAddr, path::PathBuf};

use futures::prelude::*;
use s2n_quic::provider::limits::Limits;

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(amain())
}

async fn amain() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let args: Vec<_> = std::env::args().skip(1).collect();
    let cp: PathBuf = args[0].parse()?;
    let kp: PathBuf = args[1].parse()?;
    let sa: SocketAddr = args[2].parse()?;
    let tls = s2n_quic::provider::tls::default::Server::builder()
        .with_application_protocols([b"abcde"].into_iter())?
        .with_certificate(&*cp, &*kp)?
        .build()?;
    let io = s2n_quic::provider::io::Default::builder()
        // .with_gso(enable_gso)?
        // .with_gro(enable_gro)?
        .with_receive_address(sa.into())?
        .build()?;
    let lim = Limits::new()
        .with_max_open_local_bidirectional_streams(1 << 30)?
        .with_max_open_remote_bidirectional_streams(1 << 30)?;
    let server = s2n_quic::Server::builder()
        .with_tls(tls)?
        .with_limits(lim)?
        .with_io(io)?
        .start()?;
    server
        .flat_map_unordered(None, |conn| {
            eprintln!("conn from {:?}", conn.remote_addr().unwrap());
            let (_h, sa) = conn.split();
            sa
        })
        .try_for_each_concurrent(None, |mut s| async move {
            while let Ok(Some(chunk)) = s.receive().await {
                s.send(chunk).await.ok();
            }
            s.finish().ok();
            Ok(())
        })
        .await?;
    Ok(())
}

The client:

use std::net::SocketAddr;

use futures::prelude::*;
use s2n_quic::provider::{limits::Limits, tls::rustls::rustls};

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(amain())
}

async fn amain() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let sn = std::env::args().nth(1).unwrap();
    let sa: SocketAddr = std::env::args().nth(2).unwrap().parse().unwrap();
    let crt = std::env::args().nth(3);
    let client = {
        let io = s2n_quic::provider::io::tokio::Provider::builder()
            .with_gro(false)?
            .with_gso(false)?
            .with_receive_address((std::net::Ipv6Addr::UNSPECIFIED, 0).into())?
            .build()?;
        let tls = if let Some(crt) = crt {
            #[allow(deprecated)]
            s2n_quic::provider::tls::default::Client::builder()
                .with_certificate(std::path::Path::new(&crt))?
                .with_application_protocols([b"abcde"].into_iter())?
                .build()?
        } else {
            let mut certstore = rustls::RootCertStore::empty();
            {
                let nc = rustls_native_certs::load_native_certs()?;
                certstore.add_parsable_certificates(&nc);
            }
            let mut rustlsconfig = rustls::ClientConfig::builder()
                .with_safe_defaults()
                .with_root_certificates(certstore)
                .with_no_client_auth();
            rustlsconfig
                .alpn_protocols
                .splice(.., [b"abcde"[..].to_owned()].into_iter());
            #[allow(deprecated)]
            s2n_quic::provider::tls::default::Client::new(rustlsconfig)
        };
        let lim = Limits::new()
            .with_max_open_local_bidirectional_streams(1 << 30)?
            .with_max_open_remote_bidirectional_streams(1 << 30)?
            .with_max_idle_timeout(std::time::Duration::from_secs(30))?;
        s2n_quic::Client::builder()
            .with_tls(tls)?
            .with_limits(lim)?
            .with_io(io)?
            .start()?
    };

    let mut conns = vec![];
    for _ in 0..10 {
        let connspec = s2n_quic::client::Connect::new(sa).with_server_name(sn.clone());
        conns.push(client.connect(connspec).await?);
    }
    eprintln!("{} connects done", conns.len());

    let body = &*Vec::leak(b"hello".repeat(400));

    loop {
        let start = std::time::Instant::now();
        stream::iter(&conns)
            .flat_map(|conn| stream::iter(0..100).map(|_| conn.handle()))
            .for_each_concurrent(None, |mut h| async move {
                let st = h.open_bidirectional_stream().await.unwrap();
                let (mut sr, mut sw) = st.split();
                let r = async move {
                    let mut o = vec![];
                    while let Some(chunk) = sr.receive().await.unwrap() {
                        o.extend_from_slice(&*chunk);
                    }
                    assert!(o == body);
                };
                let w = async move {
                    sw.send(bytes::Bytes::from_static(body)).await.unwrap();
                    sw.close().await.unwrap();
                };
                future::join(r, w).await;
            })
            .await;
        let dur = start.elapsed();
        println!("{dur:?}");
    }
}

Run the pair, and the client would stop printing timings after ~10min.

I have tried these combinations:

Versions:

While I was doing these experiments I have a pair of these programs running on Linux, and it has been running fine so far, at least until this time as I'm writing this issue.

Before I'm running these pair of simple reproducers, I had another more complex program; I added some logs to that one, and when it hangs, sum(client TxStreamProgress.bytes) > sum(server RxStreamProgress.bytes). Then the client ends with IdleTimerExpired after 30s. Hung streams on the server may have received 0 bytes, or full 2000 bytes but without EOF, or something like 1472 bytes, whatever.

camshaft commented 1 month ago

Sorry for the delay. We will try and get this prioritized for an investigation.