smol-rs / async-broadcast

Async broadcast channels
Apache License 2.0
163 stars 26 forks source link

Receiver: stream impl ignores `RecvError::Overflowed` and no way to implement custom stream on top it. #55

Closed niklasad1 closed 6 months ago

niklasad1 commented 7 months ago

Hey hey,

I have tried to use this crate but I want to provide a stream implementation that doesn't ignore RecvError::Overflowed as the current implementation does.

So my question is really whether it's possible to change stream implementation to return Result<T, Overflowed> instead of T? Or provide some alternative API for it?

Thanks

zeenix commented 7 months ago

Hi,

You can always create your own stream from Recv future. The futures API could be helpful here.

I just don't think this is a common enough use case (I'm not even sure what users can do with this information) for breaking API or creating new API for this.

niklasad1 commented 7 months ago

In my use case, I would want to notify users when an "old item" was replaced, i.e., one could not keep up with the sender, and let the user decide whether to drop the channel, continue with the replaced/lost message, or clear the queue of previous messages.

Fair enough it may be a niche use-case but if the Overflow thingy was returned on the stream it would be easy to support both use-cases i.e, ignore Overflow or not.

I did have a go with the Recv future API, but the problem I faced by using the Recv future is that it will drop the listener and never woken again up if it's empty.

Thus, one would need to save the Recv future in the poll fn to be woken up again, but that requires a self-referential struct, which I don't think it's possible to implement in safe rust, example below what I mean:

struct SubscriptionRx<'a> {
   pending: Option<Recv<'a, String>>, 
   inner: async_broadcast::Receiver<String>,
}

impl Stream for SubscriptionRx {
    type Item = Result<String, SubscriptionError>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let mut this = Pin::new(self);

        loop {
            if let Some(rx) = this.pending.as_mut() {
                              match futures_util::ready!(rx.poll_unpin(cx)) {
                    Ok(msg) => return Poll::Ready(Some(Ok(msg))),
                    Err(TryRecvError::Closed) => return Poll::Ready(None),
                    Err(TryRecvError::Overflowed(n)) => {
                        return Poll::Ready(Some(Err(SubscriptionError::Lagged(n))));
                    }
                }
            }

                        // This will not compile/work
            this.pending = Some(self.inner.recv());
        }
    }
}
jsdw commented 7 months ago

I thought I'd have a go at creating an example PR/description of the issue here and how we could perhaps address it:

https://github.com/smol-rs/async-broadcast/pull/56

Lemme know what you think and whether it's something that makes sense to you, or whether you'd rather not expose this sort of interface :)

zeenix commented 7 months ago

In my use case, I would want to notify users when an "old item" was replaced, i.e., one could not keep up with the sender, and let the user decide whether to drop the channel, continue with the replaced/lost message, or clear the queue of previous messages.

I think you need to specify the use case from the user's side. To me, either user is interested in all messages and then you don't use overflow mode or they are Ok with missing some messages and then use the overflow mode. I understand that you're trying to give users a choice but I'm wondering if user actually needs this choice. I can't think of any practical use case at least, where user can do anything useful with this information.

or clear the queue of previous messages.

Not sure if that would help anything. Theoretically, a receiver can miss all messages, depending on the system load, the async runtime used etc? Will the user just keep clearing the queue? It seems to me that in this case, user doesn't want overflow mode enabled at all.


Have you considered making use of inactive receivers, which allows the receiver user to switch between "interested in messages" and hence polling the stream continuously or "not interested in messages" and letting messages get dropped?

zeenix commented 7 months ago

I thought I'd have a go at creating an example PR/description of the issue here and how we could perhaps address it:

Thanks. A PR is always very appreciated, regardless of whether or not it's accepted and merged. :pray: I'll take a look soon.

niklasad1 commented 7 months ago

Yeah, you have good point that it's better to let the user decide what kind of channel to use but tricky to implement in my own code but thanks for the help.

Have you considered making use of inactive receivers, which allows the receiver user to switch between "interested in messages" and hence polling the stream continuously or "not interested in messages" and letting messages get dropped?

No, not really

niklasad1 commented 6 months ago

Closed by #56

A-Manning commented 5 hours ago

@zeenix

I just don't think this is a common enough use case

Making a note that I also ran into this use case, and found this issue

I think you need to specify the use case from the user's side. To me, either user is interested in all messages and then you don't use overflow mode or they are Ok with missing some messages and then use the overflow mode.

The use case for me is that I want to broadcast to receivers, and notify them if they are not consuming messages quickly enough. Receivers who have missed messages need to re-sync and reconnect; Receivers who are not missing messages should remain connected.

zeenix commented 5 hours ago

The use case for me is that I want to broadcast to receivers, and notify them if they are not consuming messages quickly enough.

They are not consuming quickly enough because they're not being scheduled enough, not by choice. There is nothing they can do to make things better.

Receivers who have missed messages need to re-sync

The resyncing will happen with the latest message that they do receive. If there is another reliable communication means between the sender and receiver (to achieve this synchronization you're thinking), why would they use a leaky channel in addition then?

and reconnect;

reconnect how and why? The channel is still intact. Recreating a receiver for it, won't give anything. If you missed messages, you missed them and it can happen again.


I know that when you first think about this issue, it seems super obvious that receiver should be notified of missed messages. However, if you think deeper and challenge the use a bit, it breaks apart and you realize that the value is mainly academic and in most (likely all) cases you either need a leaky channel or inactive receivers.

Also, this was already addressed by #56 so not sure why we're still discussing it? :thinking: