tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
26.59k stars 2.45k forks source link

Feature tokio::sync::mpsc::Receiver::wait_close(&self) #6595

Closed audioXD closed 4 months ago

audioXD commented 4 months ago

Is your feature request related to a problem? Please describe.

I'm in need of a async way of detecting when a tokio::sync::mpsc channel is closed, without consuming Some(T) values from tokio::sync::mpsc::Receiver<T>::recv(). Currently, I'm achieving this using a wrapper struct, using Notify and Drop. However, I believe this approach is redundant since the Tokio source code indicates that the channels already have an on-close notification mechanism. (my example is also incomplete since Receiver<T>::close() can also close the channel, and probably others that I cant remember right now)


use std::sync::Arc;
use tokio::sync::Notify;

#[derive(Clone)]
pub struct Sender<T>(tokio::sync::mpsc::Sender<T>, Arc<Notify>);

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        self.1.notify_waiters();
    }
}

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let (tx, _rx) = tokio::sync::mpsc::channel::<()>(1);
    let tx = Sender(tx, Arc::clone(&notify));

    // Sender drop detector
    let weak_tx = tx.0.downgrade();
    let handle = tokio::spawn(async move {
        while weak_tx.strong_count() > 0 {
            notify.notified().await;
        }
        println!("All senders are dropped");
    });

    // Drop handle
    drop(tx);

    // Extra for print to work
    handle.await.unwrap();
}

Describe the solution you'd like

I would like an asynchronous method that resolves when all senders are dropped or when the channel is closed using the internal close detection mechanism. For example Receiver::wait_close(&self)

Darksonn commented 4 months ago

I would rather not add this feature. It's not really the intended use-case for channels.

the Tokio source code indicates that the channels already have an on-close notification mechanism

What mechanism are you referring to? We have notify_rx_closed closed that notifies calls to Sender::closed when the receiver is dropped. I don't think we have the opposite.

audioXD commented 4 months ago

Thanks for the reply @Darksonn!

I would rather not add this feature. It's not really the intended use-case for channels.

What about looking is as as Receiver::<T>::peek_recv() method and not a on close one? Kinda like the reserve API but in reverse.

What mechanism are you referring to? We have notify_rx_closed closed that notifies calls to Sender::closed when the receiver is dropped. I don't think we have the opposite.

Sorry I just glanced at the source code, and didn't give deep. But that would also work for me, if it was implemented on WeakSender<T> and/or Sender<T>?

Darksonn commented 4 months ago

What do you mean? The Sender::closed method is implemented on Sender.

audioXD commented 4 months ago

@Darksonn When I looked more deeply, Sender::closed wasn't sufficient for my needs (sorry I mentioned it as an example).

What about adding WeakSender::closed()? Since this doesn't increment the strong reference counter, it would work as a sender drop detector for my needs.

Darksonn commented 4 months ago

What exactly are you trying to achieve? Can you explain the behavior you want?

audioXD commented 4 months ago

From

use std::sync::Arc;
use tokio::sync::Notify;

#[derive(Clone)]
pub struct Sender<T>(tokio::sync::mpsc::Sender<T>, Arc<Notify>);

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        self.1.notify_waiters();
    }
}

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let (tx, _rx) = tokio::sync::mpsc::channel::<()>(1);
    let tx = Sender(tx, Arc::clone(&notify));

    // Sender drop detector
    let weak_tx = tx.0.downgrade();
    let handle = tokio::spawn(async move {
        while weak_tx.strong_count() > 0 {
            notify.notified().await;
        }
        println!("All senders are dropped");
    });

    // Drop handle
    drop(tx);

    // Extra for print to work
    handle.await.unwrap();
}

Into

use std::sync::Arc;

#[tokio::main]
async fn main() {
    let (tx, _rx) = tokio::sync::mpsc::channel::<()>(1);

    // Handle drop detector
    let weak_tx = tx.downgrade();
    let handle = tokio::spawn(async move {
        weak_tx.closed().await; // Change here
        println!("Channel closed")
    });

    // Drop handle
    drop(tx);

    // Extra for print to work
    handle.await.unwrap();
}
audioXD commented 4 months ago

How would I use is specifically? I would use it while KINDA zipping 2 channels

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
    let (tx2, mut rx2) = tokio::sync::mpsc::channel::<()>(1);

    let weak_tx2 = tx2.downgrade();

    tokio::join!(async move {
        drop(tx2); // Would also work with tx
    }, async move {
        let result_opt = tokio::select! {
            _ =  weak_tx2.closed() => { None } // New method used here
            // Warning beware of cancel safety
            opt = async {
                let val = rx.recv().await?; // val is allowed to drop - also blocks rx2 close from being detected
                Some((val, rx2.recv().await?)) // This isn't
            } => { opt }
        };

        let Some((val, val2)) = result_opt else {
            println!("Channel 1 or 2 is closed");
            return;
        };

        // Do stuff
    });
}
Darksonn commented 4 months ago

Are you just trying to merge two channels and exit when any one of them closes?

loop {
    let v = tokio::select! {
        v = chan1.recv() => v,
        v = chan2.recv() => v,
    };

    match v {
        Some(v) => { /* do stuff */ },
        None => break,
    }
}

println!("Channel 1 or 2 is closed");
audioXD commented 4 months ago

Its more of a tokio::join!() but the issue is when ONLY 1 of them closes. It pop the value out of the receiver

let (_tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
let (tx2, mut rx2) = tokio::sync::mpsc::channel::<()>(1);

let result = match tokio::join!(rx.recv(), rx2.recv()) {
    (None, None) => None,// OK - Bowth clannels are closed
    (None, Some(_)) => todo!(), // Issue #1 - value is popped from receiver - also waiting for no reason for the other channel
    (Some(_), None) => todo!(), // Issue #2 - value is popped from receiver  - also waiting for no reason for the other channel
    (Some(foo), Some(bar)) => Some((foo, bar)), // Ok - Bowth sucessfully resoved to a value
};
Darksonn commented 4 months ago

You can make the receiver peekable using StreamExt::peekable and ReceiverStream.

Darksonn commented 4 months ago

I'm going to close this a wont-fix.