quinn-rs / quinn

Async-friendly QUIC implementation in Rust
Apache License 2.0
3.76k stars 380 forks source link

Dropping `Endpoint` can cause unnecessary "Incoming dropped without passing to..." warnings #1906

Closed gretchenfrage closed 3 months ago

gretchenfrage commented 3 months ago

If an quinn::Endpoint is dropped while it internally contains pending Incoming, those proto::Incoming will be dropped without dismissing their improper drop warner. This will trigger the "WARN: quinn_proto::endpoint:1223: quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore (may cause memory leak and eventual inability to accept new connections)" warning. The warning is not indicative of an actual problem in this case, however, because the endpoint is being dropped too.

Repro script ```rs use anyhow::Error; use quinn::*; use std::{net::ToSocketAddrs as _, sync::Arc}; use tracing::*; use tracing_subscriber::prelude::*; #[tokio::main] async fn main() { let t0 = std::time::Instant::now(); // init logging let log_fmt = tracing_subscriber::fmt::format() .compact() //.json() //.with_span_list(true) //.with_current_span(false) .with_timer(tracing_subscriber::fmt::time::uptime()) .with_line_number(true); let stdout_log = tracing_subscriber::fmt::layer() //.fmt_fields(tracing_subscriber::fmt::format::JsonFields::new()) .event_format(log_fmt); let log_filter = tracing_subscriber::EnvFilter::new( std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV).unwrap_or("info".into()), ); let log_subscriber = tracing_subscriber::Registry::default() .with(log_filter) .with(stdout_log); tracing::subscriber::set_global_default(log_subscriber).expect("unable to install logger"); // generate keys (new version of Rustls) let rcgen_cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); let key = rustls::pki_types::PrivatePkcs8KeyDer::from(rcgen_cert.key_pair.serialize_der()); let cert = rustls::pki_types::CertificateDer::from(rcgen_cert.cert); let mut roots = rustls::RootCertStore::empty(); roots.add(cert.clone()).unwrap(); let certs = vec![cert]; /* // generate keys (old version of Rustls) let rcgen_cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); let key = rustls::PrivateKey(rcgen_cert.serialize_private_key_der()); let cert = rustls::Certificate(rcgen_cert.serialize_der().unwrap()); let mut roots = rustls::RootCertStore::empty(); roots.add(&cert).unwrap(); let certs = vec![cert]; */ let mut tasks = tokio::task::JoinSet::new(); // start server fn spawn_server( certs: &Vec>, key: &rustls::pki_types::PrivatePkcs8KeyDer, ) -> (tokio::sync::oneshot::Sender<()>, impl std::future::Future>) { let (send_stop_server, mut recv_stop_server) = tokio::sync::oneshot::channel(); let certs = certs.clone(); let key = key.clone_key(); let server = tokio::spawn(log_err(async move { let mut server_crypto = rustls::ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key.into())?; /* let mut server_crypto = rustls::ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() .with_single_cert(certs, key)?; */ // make sure to configure this: server_crypto.max_early_data_size = u32::MAX; let server_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(Arc::new(server_crypto))?; let server_config = ServerConfig::with_crypto(Arc::new(server_crypto)); info!("constructing server endpoint"); let endpoint = Endpoint::server( server_config, "127.0.0.1:4433".to_socket_addrs().unwrap().next().unwrap(), )?; loop { let incoming = tokio::select! { option = endpoint.accept() => match option { Some(incoming) => incoming, None => break }, result = &mut recv_stop_server => if result.is_ok() { break } else { continue }, }; //debug!("incoming.remote_address_validated = {}, incoming.may_retry = {}", incoming.remote_address_validated(), incoming.may_retry()); if false && !incoming.remote_address_validated() { info!("not validated, responding with retry"); incoming.retry().unwrap(); continue; } /*if false && incoming.may_retry() { info!("responding with retry even thoguh it's validated"); incoming.retry().unwrap(); continue; }*/ // spawn subtask for connection tokio::spawn(log_err(async move { // attempt to accept 0-RTT data let conn = match incoming.accept()?.into_0rtt() { Ok((conn, _)) => conn, Err(connecting) => connecting.await?, }; loop { let mut stream = match conn.accept_uni().await { Ok(stream) => stream, Err(ConnectionError::ApplicationClosed(_)) => break, Err(e) => Err(e)?, }; // spawn subtask for stream tokio::spawn(log_err(async move { let msg = stream.read_to_end(1 << 30).await?; info!(msg=%String::from_utf8_lossy(&msg), "received message"); Ok(()) }).instrument(info_span!("server_stream"))); } Ok(()) }).instrument(info_span!("server_conn"))); } // shut down server endpoint cleanly endpoint.wait_idle().await; Ok(()) }).instrument(info_span!("server"))); (send_stop_server, server) } let (mut send_stop_server, mut server) = spawn_server(&certs, &key); // start client async fn send_request(conn: &Connection, msg: &str) -> Result<(), Error> { let mut stream = conn.open_uni().await?; info!(%msg, "beginning write_all and finish calls"); stream.write_all(msg.as_bytes()).await?; stream.finish()?; info!(%msg, "returned write_all and finish calls, beginning stopped call"); stream.stopped().await?; info!(%msg, "returned stopped call"); Ok(()) } tasks.spawn(log_err(async move { let mut client_crypto = rustls::ClientConfig::builder() .with_root_certificates(roots) .with_no_client_auth(); /* let mut client_crypto = rustls::ClientConfig::builder() .with_safe_defaults() .with_root_certificates(roots) .with_no_client_auth(); */ // make sure to configure this: client_crypto.enable_early_data = true; let mut endpoint = Endpoint::client( "0.0.0.0:0".to_socket_addrs().unwrap().next().unwrap() )?; let client_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(Arc::new(client_crypto))?; endpoint.set_default_client_config(ClientConfig::new(Arc::new(client_crypto))); // twice, so as to allow 0-rtt to work on the second time for i in 0..2 { info!(%i, "client iteration"); let connecting = endpoint.connect( "127.0.0.1:4433".to_socket_addrs().unwrap().next().unwrap(), "localhost", )?; // attempt to transmit 0-RTT data match connecting.into_0rtt() { Ok((conn, zero_rtt_accepted)) => { info!("attempting 0-rtt request"); let send_request_0rtt = send_request(&conn, "0-rtt hello world"); /*tokio::spawn(async move { let outcome = zero_rtt_accepted.await; info!(?outcome, "zero_rtt_accepted future resolved"); }); send_request_0rtt.await?;*/ /* let mut send_request_0rtt_pinned = std::pin::pin!(send_request_0rtt); tokio::select! { result = &mut send_request_0rtt_pinned => result?, accepted = zero_rtt_accepted => { if accepted { info!("0-rtt accepted"); send_request_0rtt_pinned.await?; } else { info!("0-rtt rejected"); send_request(&conn, "1-rtt hello world (0-rtt was attempted)").await?; } } }*/ } Err(connecting) => { info!("not attempting 0-rtt request"); let conn = connecting.await?; send_request(&conn, "1-rtt hello world (0-rtt not attempted)").await?; } } /*if true && i == 0 { let _ = send_stop_server.send(()); let _ = server.await; let (send_stop_server2, server2) = spawn_server(&certs, &key); send_stop_server = send_stop_server2; server = server2; }*/ if i == 0 { tokio::time::sleep_until(tokio::time::Instant::from(t0) + std::time::Duration::from_secs(1)).await; } println!(); } // tell the server to shut down so this process doesn't idle forever let _ = send_stop_server.send(()); let _ = server.await; Ok(()) }).instrument(info_span!("client"))); while tasks.join_next().await.is_some() {} } async fn log_err>>(task: F) { if let Err(e) = task.await { error!("{}", e); } } ```

I can make a PR to fix.