smol-rs / futures-lite

Futures, streams, and async I/O combinators.
Apache License 2.0
458 stars 26 forks source link

Should `future::zip` drop completed futures? #105

Closed nanoqsh closed 2 months ago

nanoqsh commented 2 months ago

I have found some strange and unexpected behavior with functions like future::zip and future::race when used together. I always expected that some future and async { future.await } should behave the same. But I realized that this is not the case.

Minimal code example:

use smol::{
    channel::{Receiver, Sender},
    future,
};

async fn product(tx: Sender<u32>) {
    for i in 0..20 {
        if tx.send(i).await.is_err() {
            break;
        }

        println!("sent {i}");
    }
}

async fn consume(n: u32, rx: Receiver<u32>) {
    println!("start task #{n}");
    while let Ok(i) = rx.recv().await {
        if i == 5 {
            println!("exit");
            break;
        }

        println!("received {i}");
    }
}

// Create a channel with minimum buffer size to reproduce the problem more clearly
let (tx, rx) = smol::channel::bounded(1);
let a = consume(1, rx.clone());
let b = consume(2, rx);

let tasks = future::race(a, b);

smol::block_on(future::zip(product(tx), tasks));

If you run this code it will stuck. My expectation is once one of the tasks completes, it will end the entire race and drop all receivers. Then the sender will see its receivers have been dropped, then it will exit its loop and will drop with completing the entire zip. But it seems that it doesn't work because zip doesn't drop its futures until all of them are completed.

But I can wrap tasks in an additional future like this:

- let tasks = future::race(a, b);
+ let tasks = async { future::race(a, b).await };

And now it works as expected. After one task completes, all others complete after and the code no longer stucks.

However, I think it's not intuitive. It would be better if zip will drop completed futures itself.

notgull commented 2 months ago

I will accept a PR to fix this.