rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.38k stars 622 forks source link

Unbounded memory use of `futures::channel::mpsc` with `SinkExt::feed` #2838

Open xmakro opened 7 months ago

xmakro commented 7 months ago

The following minimal example has unbounded memory usage:

use futures::SinkExt;

#[tokio::main]
async fn main() {
    let (tx, rx) = futures::channel::mpsc::channel(16);
    for i in 0.. {
        tx.clone().feed(vec![0; 100000]).await.unwrap();
        println!("{i}");
    }
}

The loop keeps running, even when the bounded channel is full, and the program eventually runs out of memory. I would have expected that there is back-pressure when the channel is full and that we only print until we reach 16, but this is not the case. When using SinkExt::send instead of SinkExt::feed, then things work as expected and the send call blocks when the channel is full.

What surprised me even more with feed is that the item is also not dropped when the cloned sender is dropped.