smol-rs / futures-lite

Futures, streams, and async I/O combinators.
Apache License 2.0
427 stars 25 forks source link

Implement `for_each_concurrent()` #57

Closed notgull closed 1 year ago

notgull commented 1 year ago

This PR adds a new type, UnorderedFutures, that stores and then polls a series of futures concurrently. I then go on to use UnorderedFutures to implement for_each_concurrent() and try_for_each_concurrent, resolving #26.

The implementation I use here is relatively inefficient. It uses two allocations per future stored; one for the Container, and another for the readiness flag. In addition, another allocation is added per poll for the waker_fn. Ideally, it would look more like this:

pub struct UnorderedFutures<Fut> {
    first: Option<Pin<Arc<Container<Fut>>>>,
}

pin_project! {
    struct Container<Fut> {
        #[pin]
        future: Mutex<Fut>,
        next: Mutex<Option<Pin<Arc<Container<Fut>>>>>,
        ready: AtomicBool,
        waker: AtomicWaker,
    }
}

impl<Fut> Wake for Container<Fut> {
    // set the ready flag to true and then wake the waker
}

Note that this would add a dependency on libstd and atomic-waker, and would require unsafe code since it's impossible to get a Pin<MutexGuard<T>> from a Pin<&Mutex<T>> without unsafe code.

However, since this is an implementation detail that can be resolved through a patch bump in the future, I decided to leave it with the as-is simple implementation.

taiki-e commented 1 year ago

UnorderedFutures seems to be a simpler version of futures's FuturesUnordered, but it's not clear to me how reasonable this is for this library, given the footgun in FuturesUnordered.

Also, as we cover the many APIs that futures has, eventually the benefits of this library will be lost...

notgull commented 1 year ago

You may be right; I wasn't aware of the footgun until now. In that case, I'll close this PR; however, we might want to explicitly document the best ways to concurrently run futures.