hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.58k stars 1.6k forks source link

Graceful shutdown never stops the server when there is an open sse connection #2787

Open SpyrosRoum opened 2 years ago

SpyrosRoum commented 2 years ago

Version hyper v0.14.17 axum v0.4.8

Platform Linux generation 5.16.15-zen1-1-zen #1 ZEN SMP PREEMPT Thu, 17 Mar 2022 00:30:11 +0000 x86_64 GNU/Linux

Description When the future passed to Server::with_graceful_shutdown completes, the server won't stop while there is an active sse connection. I stumbled onto this using axum, so here is hwo to reproduce (essentially axum's sse example + the graceful shutdown example)

use std::{convert::Infallible, time::Duration};

use {
    axum::{
        response::sse::{Event, KeepAlive, Sse},
        routing::get,
        Router,
    },
    futures::stream::{self, Stream},
    tokio::signal,
    tokio_stream::StreamExt as _,
};

#[tokio::main]
async fn main() {
    let app = Router::new().route("/sse", get(sse_handler));

    axum::Server::bind(&"127.0.0.1:3300".parse().unwrap())
        .serve(app.into_make_service())
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::repeat_with(|| Event::default().data("Hi!"))
        .map(Ok)
        .throttle(Duration::from_secs(1));

    Sse::new(stream).keep_alive(KeepAlive::default())
}

async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c().await.unwrap();
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }

    println!("Shutting down");
}

Run the server, connect with an sse client (I used curl -N localhost:3300/sse), try pressing Ctrl+c on the terminal running the server. Shutting down gets printed but the server doesn't shutdown until the sse client disconnects by it self. (new connections are not allowed, which is expected)

From my understanding the server keeps running until all connections finish, which is normally fine, but a little problematic when we are talking about sse, since an sse connection might not end at all. Note that with websocket that issue doesn't exist, the server shuts down normally closing the ws connections.

davidpdrsn commented 2 years ago

From my understanding the server keeps running until all connections finish, which is normally fine, but a little problematic when we are talking about sse, since an sse connection might not end at all.

Yeah thats right. Graceful shutdown will stop accepting new connections and wait for all existing connections to close. Which is problematic for infinite streams.

One solution is to wrap your SSE stream such that it ends when it receives a shutdown signal:

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::repeat_with(|| Event::default().data("hi!"))
        .map(Ok)
        .throttle(Duration::from_secs(1));
    let stream = or_until_shutdown(stream);

    Sse::new(stream)
}

/// Run a stream until it completes or we receive the shutdown signal.
///
/// Uses the `async-stream` to make things easier to write.
fn or_until_shutdown<S>(stream: S) -> impl Stream<Item = S::Item>
where
    S: Stream,
{
    async_stream::stream! {
        futures::pin_mut!(stream);

        let shutdown_signal = shutdown_signal();
        futures::pin_mut!(shutdown_signal);

        loop {
            tokio::select! {
                Some(item) = stream.next() => {
                    yield item
                }
                _ = &mut shutdown_signal => {
                    break;
                }
            }
        }
    }
}

Note that with websocket that issue doesn't exist, the server shuts down normally closing the ws connections.

Thats because when a regular HTTP connection is upgraded to a WebSocket connection it leaves hyper's connection pool and is handed over to whatever WebSocket library your using (axum uses tungstenite). So that means from hyper's perspective the connection is already gone with graceful shutdown is triggered.

SpyrosRoum commented 2 years ago

I understand why it happens, and the wrapper work around is not to bad, but I'm wondering if this should be handled by hyper (not sure if it's even possible?), or at least make it explicit that it's expected behavior

davidpdrsn commented 2 years ago

not sure if it's even possible?

Not sure that it is. Hyper doesn't know anything about SSE as it's just a long running response body.

DoumanAsh commented 2 years ago

I'm not sure it is all that simple, we have case where we do not use any sort of weird shit like SSE, but occasionally stumble into such problem if server is or was under heavy load. The problem is that it is not clear what exactly is wrong because we close all clients connected to the server, hence my suspicion is that there is some sort of bug in how hyper determines if it can finally shutdowns (I suspect there might be issue because we also use hyper's Client and connection pooling is buggy)