tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
26.54k stars 2.44k forks source link

mpsc receiver that panics when a sender panics #6061

Closed Jonathan-Landeed closed 11 months ago

Jonathan-Landeed commented 11 months ago

Is your feature request related to a problem? Please describe.

As described here: https://github.com/hyperium/tonic/issues/1544 I realized an mpsc receiver generally doesn't know whether a sender went out of scope normally or during a panic. That info is only available to the JoinHandle. This eventually translated into an issue where clients were unable to tell whether a grpc response stream naturally ended or ended due to a panic in a sender. They were always ending with an "End Stream: True" HEADERS frame instead of an RST_STREAM frame for panics.

Describe the solution you'd like

Fundamentally I'd like a channel where the receiver can detect when senders have panicked. Is that something that this project would be interested in? I initially posted in tonic, but I realized they just consume a Stream, they don't control its behavior.

I wrote a wrapper that will cause a panic in the receiver once all the senders close if any of them panicked. I could also see having the receiver panic the next time it is polled, but that leads to messages being lost.

Describe alternatives you've considered

I'm not sure if there's some way tonic could instead ask for the JoinHandle along with the ReceiverStream. I haven't heard back from anyone in that project yet.

Additional context

Here's the wrapper:

use futures::stream::Stream;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc;

#[derive(Clone)]
pub struct PanickingSender<T> {
    sender: mpsc::Sender<T>,
    panicked: Arc<AtomicBool>,
}
impl<T> Drop for PanickingSender<T> {
    fn drop(&mut self) {
        if std::thread::panicking() {
            self.panicked.store(true, Ordering::Relaxed);
        }
        //std::mem::forget(self.sender.clone())
    }
}
impl<T> PanickingSender<T> {
    pub async fn send(&self, value: T) -> Result<(), mpsc::error::SendError<T>> {
        self.sender.send(value).await
    }
}

pub struct PanickingReceiver<T> {
    receiver: mpsc::Receiver<T>,
    panicked: Arc<AtomicBool>,
}

impl<T> PanickingReceiver<T> {
    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        match self.receiver.poll_recv(cx) {
            Poll::Ready(None) => {
                if self.panicked.load(Ordering::Relaxed) {
                    panic!("One of the senders panicked");
                }
                Poll::Ready(None)
            }
            ret => ret,
        }
    }
}
impl<T> Stream for PanickingReceiver<T> {
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_recv(cx)
    }
}

pub fn channel<T>(buffer: usize) -> (PanickingSender<T>, PanickingReceiver<T>) {
    let (sender, receiver) = mpsc::channel(buffer);
    let panicked = Arc::new(AtomicBool::new(false));
    (
        PanickingSender {
            sender,
            panicked: panicked.clone(),
        },
        PanickingReceiver { receiver, panicked },
    )
}
Darksonn commented 11 months ago

It's an interesting suggestion and I see how it could be useful. Unfortunately, I don't think it is sufficiently useful to be added to the main Tokio crate. I recommend creating your own crate for this.