rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.34k stars 616 forks source link

`StreamExt::buffered` and `StreamExt::buffer_unordered` hangs (again) for `n = 0` #2740

Closed LeoniePhiline closed 1 year ago

LeoniePhiline commented 1 year ago

There was a "fix" (panic - not preferred) in the past:

This assert! was later removed:

I am not sure if this was intentional.

The following project hangs indefinitely:

Cargo.toml

[package]
name = "buffered"
version = "0.1.0"
edition = "2021"

[dependencies]
futures = "0.3.28"
tokio = { version = "1.28.0", features = ["macros", "rt-multi-thread"] }

src/main.rs

use futures::prelude::*;

#[tokio::main]
async fn main() {
    // Assume not knowing the length of this vec in advance.
    let v: Vec<i32> = vec![];

    // Length might be 0.
    let num_concurrent = v.len();

    // Process all in parallel.
    let w = futures::stream::iter(v.into_iter())
        .map(|e| async move {
            // "Processing" ...
            e
        })
        // Process all entries concurrently
        .buffered(num_concurrent)
        .collect::<Vec<i32>>()
        .await;

    // Never finishes.

    println!("{w:#?}"); // Never prints.
}

With a minimum buffered value of 1, the program finishes.

I believe n = 0 should not panic, as the old assert! solution did, but it should pass through directly without buffering.

(Could be worked around by lower-clamping n to 1 in Buffered::new, but I am certain there is a cleaner way.)

taiki-e commented 1 year ago

Both fixes are included in 0.4, not 0.3 that you are using.

LeoniePhiline commented 1 year ago

@taiki-e There is no 0.4?

Not on GitHub releases, no branch, no tag, not on crates.io, not in the futures-rs blog …

Where is 0.4 and why can’t I find it?