smol-rs / async-broadcast

Async broadcast channels
Apache License 2.0
163 stars 26 forks source link

Can't get use new receiver if channel was closed. #34

Closed BratSinot closed 1 year ago

BratSinot commented 1 year ago

Greetings!

From crate readme: "When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received".

But channel become closed when all receivers are gone:

#[tokio::main]
async fn main() {
    let (tx, rx) = async_broadcast::broadcast(10);
    drop(rx);

    println!("{:?}", tx.broadcast("FOO".to_owned()).await); // Err
    let mut rx = tx.new_receiver();
    println!("{:?}", tx.broadcast("FOO".to_owned()).await); // Err
    println!("{:?}", rx.recv().await); // Err
}

was expected:

    println!("{:?}", tx.broadcast("FOO".to_owned()).await); // Err
    let mut rx = tx.new_receiver();
    println!("{:?}", tx.broadcast("FOO".to_owned()).await); // Ok
    println!("{:?}", rx.recv().await); // Ok

It is a bug? If it not, any way to change behaviour?

notgull commented 1 year ago

"When all Senders or all Receivers are dropped..."

I believe that this is the intended behavior.

BratSinot commented 1 year ago

"When all Senders or all Receivers are dropped..."

Ow, missed that part. And no way to change it? I search replacement for tokio::sync::broadcast because of lagging.

BratSinot commented 1 year ago

"When all Senders or all Receivers are dropped..."

Ow, missed that part. And no way to change it? I search replacement for tokio::sync::broadcast because of lagging.

Yeah, I can store inactive receiver, but in that case Sender::broadcast will be waiting.

zeenix commented 1 year ago

Yeah, I can store inactive receiver, but in that case Sender::broadcast will be waiting.

If it's not desired for sender to wait for an active receiver before sending anything, that means you want the channel to be leaky while there are no active receiver. So what you can do is to keep an inactive receiver to keep the channel open and enable overflow mode while there are no active receivers and when you create an active receiver, you turn it back off just before creating the active receiver.

BratSinot commented 1 year ago

So what you can do is to keep an inactive receiver to keep the channel open and enable overflow mode while there are no active receivers and when you create an active receiver, you turn it back off just before creating the active receiver.

Thanks for the tip!

BratSinot commented 1 year ago

So what you can do is to keep an inactive receiver to keep the channel open and enable overflow mode while there are no active receivers and when you create an active receiver, you turn it back off just before creating the active receiver.

Okey, setting overflow mode do nothing:

    let (tx, rx) = async_broadcast::broadcast(10);
    let mut inactive_rx = rx.deactivate();
    inactive_rx.set_overflow(true);

    println!("{:?}", tx.broadcast("FOO".to_owned()).await); // still hangs here
zeenix commented 1 year ago

Okey, setting overflow mode do nothing:

Try an explicit drop(rx)?

zeenix commented 1 year ago

Also, is there really a usecase for broadcast to not wait for receivers? Why broadcast if you've nobody to receive the message?

BratSinot commented 1 year ago

Try an explicit drop(rx)?

deactivate move Receiver: pub fn deactivate(self) -> InactiveReceiver<T> {.

Also, is there really a usecase for broadcast to not wait for receivers? Why broadcast if you've nobody to receive the message?

pub-sub in HTTP long-pooling.

zeenix commented 1 year ago

Try an explicit drop(rx)?

deactivate move Receiver: pub fn deactivate(self) -> InactiveReceiver<T> {.

Right, sorry and even if that was not the case, I looked at the code and realized that broadcast will wait for active receivers.

Also, is there really a usecase for broadcast to not wait for receivers? Why broadcast if you've nobody to receive the message?

pub-sub in HTTP long-pooling.

That's way too vague for me, sorry. :) You'll have to be a bit more specific. You can always use "try_broadcast" if you want more control over the broadcasting.

BratSinot commented 1 year ago

That's way too vague for me, sorry. :)

In common words I need late subscriber.

You can always use "try_broadcast" if you want more control over the broadcasting.

Yeah, for now I use something like this:

    async fn ignore_inactive_broadcast(&self, msg: Self::Message) -> Result<(), Self::Error> {
        match self.try_broadcast(msg) {
            Ok(None) => Ok(()),
            Ok(Some(msg)) | Err(TrySendError::Full(msg)) | Err(TrySendError::Inactive(msg)) => {
                match self.broadcast(msg).await {
                    Ok(None) => Ok(()),
                    Err(err) => Err(err),
                    Ok(Some(_msg)) => unreachable!("broadcast overflow mode was enabled"),
                }
            }
            Err(TrySendError::Closed(msg)) => Err(SendError(msg)),
        }
    }
zeenix commented 1 year ago

That's way too vague for me, sorry. :)

In common words I need late subscriber.

Still not clear enough what exactly is the use case. I also need late subscribers in zbus project and was the whole reason I added the concept of inactive receivers but I actually want broadcast to wait for active receivers instead of continuing to push messages. I really don't see the use case for broadcasting wasting CPU for doing useless work. Hence why I'm asking you to explain.

If your use case requires this, I guess you're ok with broadcast wasting CPU but then you can keep an active receiver around and have it continuously receive in a task, where it just receivers for the sake of keeping the channel open and active at all times?

BratSinot commented 1 year ago

instead of continuing to push messages

In my case I skip message if there is no active receiver =)

zeenix commented 1 year ago

@BratSinot I thought more about this and came up with a usecase. I created #35 for this specific requirement.