seanmonstar / warp

A super-easy, composable, web server framework for warp speeds.
https://seanmonstar.com/post/176530511587/warp
MIT License
9.59k stars 723 forks source link

SSE gives up on Streams that return Pending #1070

Closed kouta-kun closed 1 year ago

kouta-kun commented 1 year ago

Version

cargo tree | grep warp
warp_test v0.1.0 (/home/kouta/RustroverProjects/warp_test)
└── warp v0.3.6

Platform Linux arch-kouta 6.5.5-arch1-1 #1 SMP PREEMPT_DYNAMIC Sat, 23 Sep 2023 22:55:13 +0000 x86_64 GNU/Linux

Description As the title implies, warp::sse::reply seems to give up on streams that return a Pending at some point, minimal repro:

use std::convert::Infallible;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use warp::{Filter, Rejection, Reply};

async fn sse_test() -> Result<impl Reply + Sized, Rejection> {
    let count = Arc::new(Mutex::new(0u32));
    Ok(warp::sse::reply(futures::stream::poll_fn(move |a| {
        *(count.lock().unwrap()) += 1;

        eprintln!("Count: {}", *(count.lock().unwrap()));

        if *(count.lock().unwrap()) > 5 {
            let x = (*(count.lock().unwrap())).to_string();
            return Poll::Ready(Some(Ok::<warp::sse::Event, Infallible>(warp::sse::Event::default().data(x))))
        }

        Poll::Pending
    })))
}

#[tokio::main]
async fn main() {
    warp::serve(warp::any().and_then(sse_test)).run(([0,0,0,0],5000)).await;
}

I would expect poll_fn's parameter to be executed until count reaches 5 and it can start getting responses, however, I get a single output (as if it's being called only once):

    Finished dev [unoptimized + debuginfo] target(s) in 1.94s
     Running `target/debug/warp_test`
Count: 1

The connection does remain, which makes me believe it's not being cancelled but just not checked again after getting a Pending result. If instead I replace the condition by true, I do in fact get infinite executions and message streams.

seanmonstar commented 1 year ago

If you return Pending, you need to use the contexts waker to signal when it should be polled again.

seanmonstar commented 1 year ago

The documentation for Future::poll may help here: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll

kouta-kun commented 1 year ago

Thanks!