hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.61k stars 1.6k forks source link

Infinite (SSE) streams always raises hyper::Error(IncompleteMessage) / connection closed before message completed #3759

Open peku33 opened 1 month ago

peku33 commented 1 month ago

Version 1.4.1 full

Platform windows 11 64b

Description For (possibly infinite) SSE streams, when client disconnects (ex. closes connection), hyper connection always returns error of type hyper::Error(IncompleteMessage).

I tried this code: I used server example from official website + tokio IntervalStream for body:

use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddr};

use futures::StreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{header, Request, Response};
use hyper_util::rt::TokioIo;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_stream::wrappers::IntervalStream;

async fn hello(
    _: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, Infallible> {
    let stream = IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
        .enumerate()
        .map(|(count, _)| {
            let data = Bytes::from(format!("data: Hello {}!\n\n", count));
            let frame = Frame::data(data);
            Ok::<_, Infallible>(frame)
        });

    let http_response = Response::builder()
        .header(header::CONTENT_TYPE, "text/event-stream")
        .body(BodyExt::boxed(StreamBody::new(stream)))
        .unwrap();

    Ok(http_response)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 3000));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);
        tokio::task::spawn(async move {
            if let Err(err) = http1::Builder::new()
                .serve_connection(io, service_fn(hello))
                .await
            {
                eprintln!("Error serving connection: {}", err);
            }
        });
    }

    Ok(())
}

I expected to see this happen: serve_connection shouldn't return Err on client disconnect. Ok should be returned and stream dropped.

Instead, this happened: Error of type hyper::Error(IncompleteMessage) is returned.

seanmonstar commented 1 month ago

Have you been able to take a look at what triggers the error? My guess is that since the response isn't complete (because you can still stream more events to the client), when hyper notices the connection closed and determines it needs to tell you about the potential error condition (that the client didn't see all of the response). hyper has no special handling of server-sent-events, as they aren't actually special in the HTTP spec: they are just a long lived streaming response.

Thus, it's not immediately clear how hyper would know whether this is an error, or should be ignored. Suggestions are weclome. But also, since the application knows better what kind of streaming it is doing, it can choose to ignore these errors.

peku33 commented 4 weeks ago

My guess is that since the response isn't complete (because you can still stream more events to the client), when hyper notices the connection closed and determines it needs to tell you about the potential error condition (that the client didn't see all of the response)

IIRC this is exactly the case.

Probably to handle this correctly, Body should be aware if its finite or not. Maybe it should be separate method like is_finite_stream or maybe a special case of size_hint? But this makes interface more complicated, especially for typical case.

Also maybe something a bit less correct, but easier to implement would be to check what poll_frame returned, and if it's Pending (eg. no more body available at the moment) the error could be suppressed?