tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
27.09k stars 2.49k forks source link

mpsc::Receiver only receives message if it is sent after call to recv().await #6525

Closed Benjscho closed 6 months ago

Benjscho commented 6 months ago

Version List the versions of all tokio crates you are using. The easiest way to get this information is using cargo tree subcommand:

├── tokio v1.35.1
│   └── tokio-macros v2.2.0 (proc-macro)

Platform Rust playground, stable 1.77.2. Behaviour also seen locally on arm64 Mac, with Tokio 1.35.1

Description

A spawned task listening for a message on a tokio::mpsc::UnboundedChannel only receives the message if it is sent before the Receivers call to recv().await. When a message is sent after the receivers call, the task never receives the message.

I tried this code:

use core::panic;
use std::{thread::sleep, time::Duration};
use std::sync::mpsc as std_mpsc;
use tokio::sync::mpsc as tokio_mpsc;

struct MyStruct;

impl Drop for MyStruct {
    fn drop(&mut self) {
        println!("I'm dropping!");
    }
}

#[tokio::main]
async fn main()  {
    let d = MyStruct;
    let (flush_tx, mut flush_rx) = tokio_mpsc::unbounded_channel::<std_mpsc::Sender<()>>();

    set_panic_hook(flush_tx);

    tokio::spawn(async move {
        // wait to receive flush
        if let Some(sender) = flush_rx.recv().await {
            println!("Received this!");
            drop(d);
            let _ = sender.send(());
        }
        println!("Just exited.");
    });

    tokio::spawn(async move {
        sleep(Duration::from_millis(500));
        panic!("Panicking from another thread!")
    });

    loop {
        sleep(Duration::from_millis(500));
        println!("Blocking main thread!");
    }
}

fn set_panic_hook(flush_tx: tokio_mpsc::UnboundedSender<std_mpsc::Sender<()>>) {
    let default_hook = std::panic::take_hook();
    std::panic::set_hook(Box::new(move |panic| {
        default_hook(panic);

        println!("Made it here");
        // Flush all logs
        let (tx, rx) = std::sync::mpsc::channel();
        let res = flush_tx.send(tx);
        assert!(res.is_ok());
        println!("sent the flush");
        // block until logs & metrics flushed
        let _ = rx.recv();
        println!("Finished flush");

        std::process::exit(1);
    }));
}

I expected to see this happen:

Expected run output:

thread 'tokio-runtime-worker' panicked at src/main.rs:35:9:
Panicking from another thread!
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Standard Output

Blocking main thread!
Made it here
sent the flush
Blocking main thread!
Running other task
Received this!
I'm dropping!
Just exited.
Finished flush

Instead, this happened:

Actual run output:

Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 2.19s
     Running `target/debug/playground`
thread 'tokio-runtime-worker' panicked at src/main.rs:33:9:
Panicking from another thread!
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Standard Output

Blocking main thread!
Made it here
sent the flush
Blocking main thread!
Blocking main thread!
Blocking main thread!
Blocking main thread!
Blocking main thread!
Blocking main thread!
Blocking main thread!
Blocking main thread!
... // SIGTERM program here

If I then add a delay to the listening task before calling recv().await, it receives the message correctly, drops MyStruct, and exits the program. See the playground links below for unexpected and expected behaviour.

Rust playground links:

carllerche commented 6 months ago

Panic hooks run on the thread that panics. Since you are blocking in the hook, you are blocking the runtime thread.