actix / actix-web

Actix Web is a powerful, pragmatic, and extremely fast web framework for Rust.
https://actix.rs
Apache License 2.0
21.8k stars 1.69k forks source link

Tokio timeout doesn't wake up when streaming while another tokio thread is running #3489

Open SeseMueller opened 1 month ago

SeseMueller commented 1 month ago

Expected Behavior

The tokio timeout and sleep should work and trigger after the supplied time.

Current Behavior

They are "blocked" by another tokio thread executing and do not trigger before that thread is done.

Steps to Reproduce (for bugs)

I created two examples; one where the problem occurs when using actix_web, and one where it doesn't occur when using tokio on very similar code. See Context for explanation.

Tokio example: working code

```rust use futures_util::{Stream, StreamExt}; use std::convert::Infallible; use std::{process::Command, time::Duration}; use tokio::pin; use tokio::sync::mpsc::Receiver; use futures_util::stream::unfold; use tokio::sync::mpsc::Sender; use tokio::time::timeout; #[tokio::main] async fn main() { let stream = tokio::spawn(serve()).await; let stream = stream.unwrap(); pin!(stream); loop { let item = stream.next().await; dbg!(item); } } // Whether this is async or not doesn't make a difference; just to make them more similar async fn serve() -> impl Stream> { let c = |state: Option>| async move { let mut rx = if state.is_none() { let (tx, rx) = tokio::sync::mpsc::channel(100); // Construct a new reciever if it's not // initialized yet println!("created channel"); tokio::spawn(do_work(tx)); println!("spawned work"); rx } else { state.unwrap() }; // vvv Problem line: timeout doesn't happen; the command is instead being waited on let t = timeout(Duration::from_secs(1), rx.recv()).await; // Tokio select! and Tokio sleep also don't work here. dbg!(&t); if t == Ok(None) { // Stop the stream if the command ended return None; } Some(( // Send a message on timeout Ok::("Stream fragment\n".to_owned()), Some(rx), )) }; let stream = unfold(None, c); // Construct stream from closure stream } async fn do_work(tx: Sender) { println!("Running Command"); let _ = Command::new("sh").arg("-c").arg("sleep 10").output(); println!("Command finished!"); if let Err(e) = tx.send("".to_owned()).await { eprintln!("Sending errored! {:?}", e); } } // Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second; followed by a panic! because the stream was polled after it ended // Actual output: as expected ```

Actix_web example: Has the problem

```rust use actix_web::body::MessageBody; use actix_web::web::Bytes; use actix_web::{services, web, HttpResponse, Responder}; use actix_web::{App, HttpServer}; use std::convert::Infallible; use std::{process::Command, time::Duration}; use tokio::sync::mpsc::Receiver; use futures_util::stream::unfold; use tokio::sync::mpsc::Sender; use tokio::time::timeout; #[tokio::main] async fn main() -> std::io::Result<()> { HttpServer::new(|| { let services = services![web::scope("/a").route("/b", web::get().to(serve))]; // Available // at localhost:8080/a/b App::new().service(services) }) .bind(("localhost", 8080)) .unwrap() .run() .await } async fn serve() -> impl Responder { let c = |state: Option>| async move { let mut rx = if state.is_none() { let (tx, rx) = tokio::sync::mpsc::channel(100); // Construct a new reciever if it's not // initialized yet println!("created channel"); //tokio::spawn(do_work(tx)); actix_web::rt::spawn(do_work(tx)); // Both have the same problem println!("spawned work"); rx } else { state.unwrap() }; // vvv Problem line: timeout doesn't happen; the command is instead being waited on let t = timeout(Duration::from_secs(1), rx.recv()).await; // Tokio select! and Tokio sleep also don't work here. dbg!(&t); if t == Ok(None) { // Stop the stream if the command ended return None; } Some(( // Send a message on timeout Ok::("Stream fragment\n".to_owned().try_into_bytes().unwrap()), Some(rx), )) }; let stream = unfold(None, c); // Construct stream from closure HttpResponse::Ok().streaming(stream) // Stream out the answer } async fn do_work(tx: Sender) { println!("Running Command"); let _ = Command::new("sh").arg("-c").arg("sleep 10").output(); println!("Command finished!"); if let Err(e) = tx.send("".to_owned()).await { eprintln!("Sending errored! {:?}", e); } } // Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second // Actual output: a single line of "Stream Fragment" is output at the very end. ```

Edit: Using async_process::Command and then awaiting instead of using std::process::Command does solve this particular case.

Context

I am writing a REST API in actix_web that is heavily relying on streams to the client.

While I was adding a heartbeat so that the connection to the client wouldn't time out and the client could be sure that the server was still alive, I came across the issue that some tokio functionality like sleep, select! (with sleep) and timeout don't properly work.

I managed to reduce it to the two examples above.

In both cases, a channel is spawned inside an async closure, which sender is given to a new tokio task that takes long to finish. If the Receiver is then awaited using a tokio timeout (or a tokio sleep is called), the timeout (or sleep) doesn't trigger after the given time, but only after the Receiver is available.

In the context I used it, the async closure was then unfolded into a stream, which is then served to a client. This causes the stream to become "stuck" and not send a heartbeat, until the Receiver can recieve.

In the tokio example, the stream is instead given to the main function, pinned, and iterated through until completion. Here, the stream does not become "stuck" and instead sends a heartbeat once every second, as expected.

(I put this issue on actix-web because it seems to happen because of the way streaming is handeled and because the tokio example works)

Your Environment

The given examples work on fresh install.

SeseMueller commented 3 weeks ago

After doing some digging, this seems to be caused by https://github.com/rust-lang/futures-rs/issues/2775. See their comment for a good explanation of why it's happening.

The issue is also known at https://github.com/tokio-rs/tokio/issues/2542, which came up with a similar example as I did.

For me, this means that I can fix my problem by moving from std::process::Command to async_process::Command, because that is await-ed, but the general issue still persists.

(A similar problem was discussed:https://github.com/bytecodealliance/wasmtime/issues/2876, but it's unlikely that the cause is the same as I did not find any evidence of mixed executors.)