zesterer / flume

A safe and fast multi-producer, multi-consumer channel.
https://crates.io/crates/flume
Apache License 2.0
2.47k stars 85 forks source link

`async`: `send_timeout` & `send_deadline` #125

Open NathanSWard opened 1 year ago

NathanSWard commented 1 year ago

Currently send_timeout/deadline are implemented via blocking. This means if I want to make an async variant of these two, I need to use another library such as tokio::time::timeout - which notably is OK.

However, the problem comes in when the timeout happens before the value is successfully sent on the channel. There is no way to retrieve the queued message from the SendFut.

let (tx, rx) = bounded(n);

let mut fut = tx.send_async(42);
let not_sent = match tokio::time::timeout(DURATION, &mut fut).await {
    Ok(Ok(())) => {  None /* value was sent */ },
    Ok(Err(flume::SendError(value))) => { Some(value) /* channel was closed/disconnected */ },
    Err(_) => { 
        // timeout happened but there is no way to get the value back out of SendFut
        fut.into_inner().unwrap() // maybe something like `fut.into_inner()` ??
    }
};

Ideally SendFut would have an into_inner(self) -> Option<T> that would return Some(T) if the value was not yet send successfully.

zesterer commented 1 year ago

Yes, this is definitely missing functionality. I think this would be relatively easy to implement: internally we could just use a regular old async timeout (tokio or otherwise) and then use self.hook.take() to pull the value out of the future. Would you be interested in implementing this?

NathanSWard commented 1 year ago

Sure, I'll take a stab at the implementation and post a PR for it.

NathanSWard commented 1 year ago

See here for the PR :)