rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.43k stars 629 forks source link

Timing out on `SinkExt::send` still causes the item to be re-inserted later into the bounded channel #2894

Open NumberFour8 opened 1 month ago

NumberFour8 commented 1 month ago

When inserting an element into the bounded MPSC channel that is at full capacity, the SinkExt::send future correctly waits until there is space in the channel.

However, if the SinkExt::send future is selected against a timeout (the future should be therefore destroyed if the timeout elapses first), the attempted value is still later inserted into the channel.

The expectation is that if the send future is terminated early, the item it attempted to insert should never appear in the channel.

Instead, it seems that the last item attempted to be send is cached (when polling for channel readiness starts inside send) and re-delivered in the next send. That seems as a bug, or at best an undocumented behavior, if I'm not mistaken.

Minimal example for reproducing the behavior:

use std::time::Duration;

use futures::prelude::*;
use futures::pin_mut;
use tokio;
use anyhow;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (snd, recv) = futures::channel::mpsc::channel(3);
    pin_mut!(snd);
    pin_mut!(recv);

    snd.send(1).await?;
    snd.send(2).await?;
    snd.send(3).await?;
    // Channel now contains 1,2,3

    // Insertion of 4 times out, because the channel is at full capacity
    {
        let insert_4 = std::pin::pin!(snd.send(4));
        let timeout = std::pin::pin!(tokio::time::sleep(Duration::from_millis(500)));
        match futures::future::select(insert_4, timeout).await {
            futures::future::Either::Left(_) => anyhow::bail!("must timeout when at full capacity"),
            futures::future::Either::Right(_) => {}
        }
    }

    // Free up some capacity by consuming 1,2
    assert_eq!(Some(1), recv.next().await);
    assert_eq!(Some(2), recv.next().await);
    // Channel should now contain only 3

    // Now send 5 into the channel and close it
    snd.send(5).await?;
    snd.close_channel();

    // Channel now should contain 3,5 (but actually seems to contain 3,4,5)
    assert_eq!(Some(3), recv.next().await);
    assert_eq!(Some(5), recv.next().await); // Panics here: 4 happens to be in the despite the timeout
    assert_eq!(None, recv.next().await);

    Ok(())
}

Playground link: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e72c947af414fc813345d71533065414