hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.6k stars 1.6k forks source link

Gracefully shutting down a TLS server sometimes leads to the client not receiving a response #3792

Open chinedufn opened 1 day ago

chinedufn commented 1 day ago
hyper = { version = "=1.5.1", features = ["client", "http1"] }
hyper-util = { version = "=0.1.10", features = ["http1", "tokio", "server"] }
MacBook Pro 16-inch, 2019
2.4 GHz 8-Core Intel Core i9

I'm observing errors while testing graceful shutdown of a hyper server.

When gracefully shutting down a TLS connection the client will sometimes get an IncompleteMessage error.

This only happens for TLS connections. The graceful shutdown process is always successful for non-TLS connections.

reqwest::Error { kind: Request, url: "https://127.0.0.1:62103/", source: hyper_util::client::legacy::Error(SendRequest, hyper::Error(IncompleteMessage)) }

Given the following testing steps:

When the hyper server is not using TLS, the test pass. When the hyper server is using TLS, the test fails with an IncompleteMessage error.

I've created a repository that reproduces the issue https://github.com/chinedufn/hyper-tls-graceful-shutdown-issue

git clone https://github.com/chinedufn/hyper-tls-graceful-shutdown-issue
cd hyper-tls-graceful-shutdown-issue

# This always passes
cargo test -- graceful_shutdown_without_tls

# This always fails
cargo test -- graceful_shutdown_with_tls

Here's a quick snippet of the graceful shutdown code:

    let conn = builder.serve_connection(stream, hyper_service);
    pin_mut!(conn);

    tokio::select! {
        result = conn.as_mut() => {
            if let Err(err) = result {
                dbg!(err);
            }
        }
        _ = should_shut_down_connection => {
            conn.as_mut().graceful_shutdown();
            let result = conn.as_mut().await;
            if let Err(err) = result {
                dbg!(err);
            }
        }
    };

Here is the full source code for convenience (also available in the linked repository)

Cargo.toml (click to expand) ```toml [package] name = "hyper-graceful-shutdown-issue" version = "0.1.0" edition = "2021" publish = false # We specify exact versions of the dependencies to ensure that the issue is reproducible. [dependencies] hyper = { version = "=1.5.1", features = ["client", "http1"] } hyper-util = { version = "=0.1.10", features = ["http1", "tokio", "server"] } http-body-util = "=0.1.2" futures-util = "=0.3.31" rand = "=0.8.5" reqwest = { version = "=0.12.9" } rustls-pemfile = "2" tokio = { version = "=1.41.1", features = ["macros", "net", "rt-multi-thread", "sync", "time"] } tokio-rustls = "0.26" ```
Rust code (click to expand) ```rust use futures_util::pin_mut; use http_body_util::Empty; use hyper::body::Bytes; use hyper::body::Incoming; use hyper::{Request, Response, StatusCode}; use hyper_util::rt::{TokioExecutor, TokioIo}; use rand::Rng; use rustls_pemfile::{certs, pkcs8_private_keys}; use std::io::{BufReader, Cursor}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::watch::Sender; use tokio::sync::{oneshot, watch}; use tokio_rustls::rustls::pki_types::PrivateKeyDer; use tokio_rustls::rustls::server::Acceptor; use tokio_rustls::rustls::ServerConfig; use tokio_rustls::LazyConfigAcceptor; #[derive(Copy, Clone)] enum TlsConfig { Disabled, Enabled, } async fn run_server( tcp_listener: TcpListener, mut shutdown_receiver: oneshot::Receiver<()>, tls_config: TlsConfig, ) { let enable_graceful_shutdown = true; let (wait_for_requests_to_complete_tx, wait_for_request_to_complete_rx) = watch::channel::<()>(()); let (shut_down_connections_tx, shut_down_connections_rx) = watch::channel::<()>(()); loop { tokio::select! { _ = &mut shutdown_receiver => { drop(shut_down_connections_rx); break; } conn = tcp_listener.accept() => { tokio::spawn( handle_tcp_conn( conn, wait_for_request_to_complete_rx.clone(), shut_down_connections_tx.clone(), tls_config ) ); } } } drop(wait_for_request_to_complete_rx); if enable_graceful_shutdown { wait_for_requests_to_complete_tx.closed().await; } } async fn handle_tcp_conn( conn: tokio::io::Result<(TcpStream, SocketAddr)>, indicate_connection_has_closed: watch::Receiver<()>, should_shut_down_connection: watch::Sender<()>, tls_config: TlsConfig, ) { let tcp_stream = conn.unwrap().0; let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); match tls_config { TlsConfig::Disabled => { let stream = TokioIo::new(tcp_stream); handle_tokio_io_conn(builder, stream, should_shut_down_connection).await } TlsConfig::Enabled => { let acceptor = LazyConfigAcceptor::new(Acceptor::default(), tcp_stream); tokio::pin!(acceptor); let start = acceptor.as_mut().await.unwrap(); let config = rustls_server_config(); let stream = start.into_stream(config).await.unwrap(); let stream = TokioIo::new(stream); handle_tokio_io_conn(builder, stream, should_shut_down_connection).await } }; drop(indicate_connection_has_closed); } fn rustls_server_config() -> Arc { let mut cert_reader = BufReader::new(Cursor::new(ssl_cert::TLS_CERTIFICATE_SELF_SIGNED)); let mut key_reader = BufReader::new(Cursor::new(ssl_cert::TLS_PRIVATE_KEY_SELF_SIGNED)); let key = pkcs8_private_keys(&mut key_reader) .into_iter() .map(|key| key.unwrap()) .next() .unwrap(); let certs = certs(&mut cert_reader) .into_iter() .map(|cert| cert.unwrap()) .collect(); let config = ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, PrivateKeyDer::Pkcs8(key)) .unwrap(); Arc::new(config) } async fn handle_tokio_io_conn( builder: hyper_util::server::conn::auto::Builder, stream: TokioIo, should_shut_down_connection: Sender<()>, ) { let should_shut_down_connection = should_shut_down_connection.closed(); pin_mut!(should_shut_down_connection); let hyper_service = hyper::service::service_fn(move |_request: Request| async { let jitter_milliseconds = rand::thread_rng().gen_range(0..5); let sleep_time = Duration::from_millis(5) + Duration::from_millis(jitter_milliseconds); tokio::time::sleep(sleep_time).await; let response = Response::builder() .status(StatusCode::OK) .body(Empty::::new()) .unwrap(); Ok::<_, &'static str>(response) }); let conn = builder.serve_connection(stream, hyper_service); pin_mut!(conn); tokio::select! { result = conn.as_mut() => { if let Err(err) = result { dbg!(err); } } _ = should_shut_down_connection => { conn.as_mut().graceful_shutdown(); let result = conn.as_mut().await; if let Err(err) = result { dbg!(err); } } }; } /// The key and certificate were generated using the following command: /// ```sh /// # via https://letsencrypt.org/docs/certificates-for-localhost/#making-and-trusting-your-own-certificates /// openssl req -x509 -out local_testing.crt -keyout local_testing.key \ /// -newkey rsa:2048 -nodes -sha256 \ /// -subj '/CN=localhost' -extensions EXT -config <( \ /// printf "[dn]\nCN=localhost\n[req]\ndistinguished_name = dn\n[EXT]\nsubjectAltName=DNS:localhost\nkeyUsage=digitalSignature\nextendedKeyUsage=serverAuth") /// ``` mod ssl_cert { pub(super) const TLS_CERTIFICATE_SELF_SIGNED: &'static str = r#"-----BEGIN CERTIFICATE----- MIIDDzCCAfegAwIBAgIUaQDe0cAZUax+1IpET1vF8UFm3jswDQYJKoZIhvcNAQEL BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI0MTAyMjExMjYyMFoXDTI0MTEy MTExMjYyMFowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF AAOCAQ8AMIIBCgKCAQEAsJGiYaWK0k6NT6J4uFyzPiTFGkjQ5K77dKBXrcwjz4LT vsxUFAAyrV8GYIWJaaEKKD5WqF/B8WN1Di3+Ut8dxR7buDWgNN3R7qsp43IaTNsV ORaN72DogMd94NzNVbAiqh+rjBNMyU/7AXwSifBbMzx/FL9KmGU5XejJtSx0EAd1 yV+cL+s/lWgDd0A82DdpZYNSfk5bQ6rcQis803VIqoVDM+4u85y/4wCR1QCQeGhr YIeqwfGwf4o3pXB/spE2dB4ZU/QikYcTrUWVZ9Fup4UomUlggV9J0CuphjADdQxW Nv3yH7HqgjmHl6h5Ei91ELjMH6TA2vwb3kv4bLoX/wIDAQABo1kwVzAUBgNVHREE DTALgglsb2NhbGhvc3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMB MB0GA1UdDgQWBBTHjwqKi5dOeMSjlhhahDEEMPQBQjANBgkqhkiG9w0BAQsFAAOC AQEACJo+lYhAtZgGkIZRYzJUSafVUsv2+5aFwDXtrNPWQxM6rCmVOHmZZlHUyrK/ dTGQbtO1/IkBAutBclVBa+lteXFqiOGWiYF+fESioBx7DEXQWgQJY4Q5bYSHUkNu u7vKXPt+8aAaaKQA8kR5tEO/+4atlD619kor4SwajOMWX2johgNku5n6mZ+fldQj 5Bv7PhPWZjpBJoqaXkHWJiT449efJQsiHAXY73eLmUf4kuJjQLuPXwZ/TY3KeH8a tuWXtYQp1pU60yRzrO8JJ/4gj1ly/bzs9CTaD/u6hmpbdMdgZRR5ZZqvK3KYyI82 3TfEIvddnICP7SnH+BUzCQJhXg== -----END CERTIFICATE-----"#; pub(super) const TLS_PRIVATE_KEY_SELF_SIGNED: &'static str = r#"-----BEGIN PRIVATE KEY----- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCwkaJhpYrSTo1P oni4XLM+JMUaSNDkrvt0oFetzCPPgtO+zFQUADKtXwZghYlpoQooPlaoX8HxY3UO Lf5S3x3FHtu4NaA03dHuqynjchpM2xU5Fo3vYOiAx33g3M1VsCKqH6uME0zJT/sB fBKJ8FszPH8Uv0qYZTld6Mm1LHQQB3XJX5wv6z+VaAN3QDzYN2llg1J+TltDqtxC KzzTdUiqhUMz7i7znL/jAJHVAJB4aGtgh6rB8bB/ijelcH+ykTZ0HhlT9CKRhxOt RZVn0W6nhSiZSWCBX0nQK6mGMAN1DFY2/fIfseqCOYeXqHkSL3UQuMwfpMDa/Bve S/hsuhf/AgMBAAECggEANKEsPBvaXaZxY4fDoPxspvzRzWxf65ImvJQgnlrHX86Y q/n+o7mNYXT+Ex4qn9QTEXzHWse0KO3i0beu42fC2WNBzc4aMzfdH91gDn4PzdHN qScKZoxFsUEFSdW21LA8HOZ0vTtxe13+LOqdIgWFQafqHzaHlxYw+8dr/DdEXxRC xh2U9xlrgplz2VJW+NhvIUJoBpsvRJ0XK58Cs0L7+CHrdaUmtL6gLehp49wPy810 l2r168CcHw/HdYN2SKtA3l2EldyZ0BdgHnblq9ozY8isTCn1ccQE8sr1Id1rCj26 BlyVoZurukB1tYTtf9LvQnC6MPdcC7hbHkpYGvFcKQKBgQDrZmLhNNL8aj5iCwXg BnqTFBSvkADPE7inI/iPy69Q3k87LHM27uUQy0wzJow4HrfQNSE3HN0gHo7K8/KB n+vR0QCmYu5x7Uk994RBzub+8QfEPVP3yJP5MgbaT68L7BwiaWkVTU+sLIXVCxAl OsYGtXrsvBdEVKLKiCCxVQR32QKBgQDABUTBKFCtlMNrK8MM9h/9ddMHv2QH6hd3 x8mijKEsLvjRDYUzAWd2a2Rabo4Wf9kv7R/dGR/3jWp+84rgmr9s/XS6pABoCYjJ RNQ6kD+b+apSTybToTFJ78hhdfAeT4IzrxdbHMOOlZl86R8IpDzTubJAAMrnJxpX +prSi8E/lwKBgGhX+BiPi75rcb+P10jYVlj/m7O+hz1DJqSf4zwKM2oLQN+f8mo1 NsBc/SfnPFxb8WqPQmvllXb5VJ5Nx/8BXkyg8kLOs5c4cTDQmIV7KxVyzdiEvsWk 2UKqlDMNAzCrtkTiqLvSizBsg95NixiVltW+eACb10xon8ha0vMIFnTxAoGBAIL/ lSZJgLDK+n6Uvl6LUsuxpCR296FGnHgE/pQ8aIAiE3FbTfG8FX9+SFpBbgH/eoXt uX029M4H1g2BzM7qA4oxZ38k/3n6dy0IHdlOK3cXXpEEmrJqF5wfT47dzNCA4Yys +LwZ5XfSq4HB8IAOu8iduPNdFw+XZ6t5tkHJQi9FAoGAU+39yLcc1c1gWQw4UCuU D2vlTSSR7U0ji23goHYFGyIxmJNa1lpx/jxOlHSu99PNhx87FTDyW5HuoaayyUyw dK+3pvS6KhSQMCrcpdAND5sRV3KsGGdYpy/ICmVFeK9f26VMOTN3jdCqLR+gnAaY fuCBU0U/o2qoHC7VjsfzQZw= -----END PRIVATE KEY-----"#; } #[cfg(test)] mod tests { use super::*; use std::future::Future; use hyper::StatusCode; use tokio::sync::mpsc; /// This always passes. #[tokio::test] async fn graceful_shutdown_without_tls() { test_graceful_shutdown(TlsConfig::Disabled).await } /// This always fails. #[tokio::test] async fn graceful_shutdown_with_tls() { test_graceful_shutdown(TlsConfig::Enabled).await } /// ## Steps /// - Start the server. /// - Send many concurrent requests to the server /// - Wait for any of the requests to receive a response. /// Since the request handler takes a random amount of time we can be reasonably confident /// that when we receive a response there are still some other requests that are in-progress. /// - Tell the server to shut down. We expect that there are still some in-progress requests. /// - Assert that we receive a 200 OK response for each request. /// This means that the graceful shutdown process was successful. async fn test_graceful_shutdown(tls_config: TlsConfig) { // We repeat the test multiple times since the error does not always occur. const TEST_REPETITION_COUNT: usize = 100; for _ in 0..TEST_REPETITION_COUNT { let tcp_listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) .await .unwrap(); let addr = tcp_listener.local_addr().unwrap(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); let (successful_shutdown_tx, successful_shutdown_rx) = oneshot::channel(); // Spawn the server in a separate async runtime so that once the server stops the // runtime will stop. // This ensures that when the server stops the runtime will stop and all other tasks // will be immediately dropped. // This way if we receive responses from the server we know that the server did not // shut down until after the request tasks finished running. std::thread::spawn(move || { tokio::runtime::Runtime::new() .unwrap() .block_on(async move { run_server(tcp_listener, shutdown_receiver, tls_config).await; successful_shutdown_tx.send(()).unwrap(); }) }); let mut request_handles = vec![]; let (response_received_tx, mut response_received_rx) = mpsc::unbounded_channel(); // An arbitrarily chosen number of requests to send concurrently. const CONCURRENT_REQUEST_COUNT: usize = 10; for _ in 0..CONCURRENT_REQUEST_COUNT { let response_received_tx = response_received_tx.clone(); let handle = tokio::spawn(async move { let result = send_get_request(addr, tls_config).await; response_received_tx.send(()).unwrap(); result }); request_handles.push(handle); } // Wait to receive the first response, and then shut down the server. // Since we sent many requests to the server we are confident that some of them have not // yet completed. // This means that if all requests get a 200 OK response then the graceful shutdown // process was successful. let _wait_for_first_response = response_received_rx.recv().await.unwrap(); shutdown_sender.send(()).unwrap(); // Check that every request received a 200 response. // We panic if a request ended with an error. for handle in request_handles { let result = handle.await.unwrap(); match result { Ok(status_code) => { assert_eq!(status_code, StatusCode::OK); } Err(err) => { panic!( r#" Error during the request/response cycle: {err} {err:?} "# ) } } } // Make sure that the server gets shut down. // If it was shut down and every request succeeded then we ca be confident that the // graceful shutdown process worked. let _did_shutdown = wait_max_3_seconds(successful_shutdown_rx).await; } } async fn send_get_request( addr: SocketAddr, tls_config: TlsConfig, ) -> Result { let uri = match tls_config { TlsConfig::Disabled => { format!("http://{addr}") } TlsConfig::Enabled => { format!("https://{addr}") } }; let client = reqwest::Client::builder() // We use a self-signed cert for localhost. Here we're trusting that self-signed cert. .danger_accept_invalid_certs(true) .build()?; let response = client.get(uri).send().await.map(|r| r.status()); response } /// Used to prevent the test from running indefinitely. async fn wait_max_3_seconds(fut: impl Future) { tokio::time::timeout(std::time::Duration::from_secs(3), fut) .await .unwrap(); } } ```
seanmonstar commented 6 hours ago

Have you been able to trace what is happening? One thing that sort of sounds like is that the TLS stream perhaps hasn't flushed the response before closing?

seanmonstar commented 6 hours ago

But do you know for sure all the requests have indeed started? Or could the shutdown be triggered just before hyper has been able to see the request bytes?

chinedufn commented 3 hours ago

Or could the shutdown be triggered just before hyper has been able to see the request bytes?

Seems to be the problem. Most of this comment is how I arrived at that. Skip to the end for some potential solutions.


Did a bit of hyper=trace,hyper_util=trace tracing before opening the issue but ~nothing jumped out at me.~ (oh, nevermind. Now I remember. I wasn't getting hyper traces because I didn't enable the feature. Next time I dive in I can enable that.)

Hmm, so if I sleep for 1ms before dropping the watch::Receiver that serves as the shut down signal then the test passes. As in, a sleep before this line: https://github.com/chinedufn/hyper-tls-graceful-shutdown-issue/blob/7c3d52f54e839096022a3e9b7b478ad0a635293a/src/lib.rs#L42

Inlining the snippet code here for convenience:

    loop {
        tokio::select! {
            _ = &mut shutdown_receiver => {
                // ADDING THIS SLEEP MAKES THE TEST PASS
                tokio::time::sleep(std::time::Duration::from_millis(1)).await;
                drop(shut_down_connections_rx);
                break;
            }
            conn = tcp_listener.accept() => {
                tokio::spawn(
                    handle_tcp_conn(
                        conn,
                        wait_for_request_to_complete_rx.clone(),
                        shut_down_connections_tx.clone(),
                        tls_config
                    )
                );
            }
        }
    }

The drop(shut_down_connections_rx) shut down signal causes this Connection::gracful_shutdown tokio::select! branch to be selected. https://github.com/chinedufn/hyper-tls-graceful-shutdown-issue/blob/7c3d52f54e839096022a3e9b7b478ad0a635293a/src/lib.rs#L145-L159

Inlining the snippet here for convenience:

    tokio::select! {
        result = conn.as_mut() => {
            if let Err(err) = result {
                dbg!(err);
            }
        }
        _ = should_shut_down_connection => {
            // TEST STILL FAILS IF WE SLEEP RIGHT HERE
            conn.as_mut().graceful_shutdown();
            let result = conn.as_mut().await;
            if let Err(err) = result {
                dbg!(err);
            }
        }
    };

Key Points

The test passes if we sleep for a millisecond before sending on the channel that leads to conn.as_mut().graceful_shutdown(); getting called for all open connections. i.e. if we sleep for one millisecond right before this line: https://github.com/chinedufn/hyper-tls-graceful-shutdown-issue/blob/7c3d52f54e839096022a3e9b7b478ad0a635293a/src/lib.rs#L42

If I instead move the sleep to just before the conn.as_mut().graceful_shutdown(); line, the test fails, even if we sleep for 1, 5 seconds, 50milliseconds or seemingly any other amount of time. ( i.e. if we sleep for five seconds right before this line the test will fail -> https://github.com/chinedufn/hyper-tls-graceful-shutdown-issue/blob/7c3d52f54e839096022a3e9b7b478ad0a635293a/src/lib.rs#L152 ) ( I also confirmed that sleeping for 50 milliseconds leads the test to fail.)

This suggests that the problem occurs when we call Connection::graceful_shutdown before we've started polling the connection and receiving bytes.

It looks like graceful_shutdown calls disable_keepalive, and disable_keepalive closes the connection if no bytes have been received. https://github.com/hyperium/hyper/blob/master/src/proto/h1/dispatch.rs#L90-L100

Or could the shutdown be triggered just before hyper has been able to see the request bytes?

Yeah seems like this is the problem.

Problem

Currently, if a user opens a TCP connection to a server and the server calls Connection::graceful_shutdown before any bytes have been received, the TCP connection will be closed.

This means that if the client has just begun transmitting packets, but the server has not received them, the client will get an error. This is not a graceful shutdown since the client was not made aware that the connection was going to be closed.

Potential Solutions

Potential Solution - change how disable_keepalive decides whether to close the connection

I haven't yet poked around to figure out what has_initial_read_write_state is checking for.

Not yet sure why the tests would pass for a non-TLS server but fail for a TLS server.

Is it possible that disable_keepalive is immediately closing the connection even if the TLS negotiation process has begun?

Could a solution be to avoid closing the connection if the TLS negotiation process has begun?

Potential Solution - wait for Duration before closing an unused connection

One way to avoid such an error would be to something like:

  1. Server somehow indicates to client that the connection will close
  2. Client receives indication that server is planning to close the connection
  3. If the client has not yet sent any data, server waits for Duration "W" (configurable) to see if the client sends any packets
  4. If the client DOES NOT send any packets after the wait Duration "W", close the connection
  5. If the client DOES send packets before the wait Duration "W", receive the bytes and then close the connection

I'm unfamiliar with the TCP spec, but from some quick searching it the FIN segment might be useful here? I can do more research if the above steps seem like a good path forward.

Potential Solution - ... I'll add more if I think of any ...

...