rmanoka / async-scoped

A scope for async_std and tokio to spawn non-static futures
117 stars 14 forks source link

Make scope yield elements in FIFO order #19

Closed akoshelev closed 8 months ago

akoshelev commented 9 months ago

Using FuturesUnordered internally causes Scope to returns items out of order, making clients responsible for matching the order of elements sent to Scope received from it. This extra bookkeeping along with extra buffering make client's work repetitive and somewhat a duplicate of what FuturesOrdered already provides.

Given that Scope implements a Stream, it seems to be a safe default to return elements back in FIFO order.

Alternatives that could be considered

rmanoka commented 9 months ago

@akoshelev I think it's natural to have it unordered as they may complete in any order. Ordering them may involve storing too many of them in memory waiting for something else to complete.

rmanoka commented 9 months ago

May be we could just provide a spawn_with_handle and in that case return the handle, and then the user themselves can wait for them as necessary.

akoshelev commented 9 months ago

I think it's natural to have it unordered as they may complete in any order. Ordering them may involve storing too many of them in memory waiting for something else to complete.

I agree that there is a non-trivial cost associated with returning items in order even with FuturesOrdered being as efficient as it could be.

I have a prototype ready that uses traits to make FuturesUnordered the default choice, but allowing clients to specify their own container for futures.

pub trait FuturesContainer<F: Future>: Default + Stream<Item = F::Output> + Unpin {
    fn push(&mut self, fut: F);
}

impl <F: Future> FuturesContainer<F> for FuturesOrdered<F> {
    fn push(&mut self, fut: F) {
        self.push_back(fut)
    }
}

impl <F: Future> FuturesContainer<F> for FuturesUnordered<F> {
    fn push(&mut self, fut: F) {
        FuturesUnordered::push(self, fut)
    }
}

then Scope just needs to set the proper default

pub struct Scope<'a, T, Sp: Spawner<T> + Blocker, C: FuturesContainer<Sp::SpawnHandle> = FuturesUnordered<<Sp as Spawner<T>>::SpawnHandle>> {
...
}

would it be a possible option to enable FIFO?

akoshelev commented 9 months ago

May be we could just provide a spawn_with_handle and in that case return the handle, and then the user themselves can wait for them as necessary.

I am looking into that as well

rmanoka commented 8 months ago

I think this will be closed by #20 ; essentially, we can provide access to the futures-unordered collection as an iter over the futures (which will be in FIFO order). Along with #20, the user can just wait on the futures however they want. The remaining futures can then be awaited as usual at drop by blocking the thread.

rmanoka commented 8 months ago

@akoshelev After some thinking, I agree that FuturesOrdered may indeed be a good choice for the collection; let's revert to the version where you changed it so. It would be great if you could add a benchmark that tests the time to spawn and wait a good amount of futures: we should ensure there's no performance loss. We can merge the PR if there's no loss.

akoshelev commented 8 months ago

sure let me try to do that

akoshelev commented 8 months ago

Here is some benchmarks I ran locally on my 2021 Mac M1. My conclusion is that performance difference is within the noise. With Tokio runtime we send more time in user space which order management inside FuturesOrdered can be accounted for.

Tokio/FuturesUnordered

hyperfine --warmup 3 'target/release/deps/spawner-5562dd14e1161d6e'
Benchmark 1: target/release/deps/spawner-5562dd14e1161d6e
  Time (mean ± σ):      1.298 s ±  0.205 s    [User: 1.483 s, System: 1.003 s]
  Range (min … max):    1.090 s …  1.752 s    10 runs

Tokio/FuturesOrdered

 hyperfine --warmup 3 'target/release/deps/spawner-4e10fd19901cdbc8'
Benchmark 1: target/release/deps/spawner-4e10fd19901cdbc8
  Time (mean ± σ):      1.317 s ±  0.146 s    [User: 1.613 s, System: 1.189 s]
  Range (min … max):    1.086 s …  1.537 s    10 runs

AsyncStd/FuturesUnordered

export ASYNC_STD_THREAD_COUNT=4

hyperfine --warmup 3 'target/release/deps/spawner-517d0e5e51490bc0'
Benchmark 1: target/release/deps/spawner-517d0e5e51490bc0
  Time (mean ± σ):     952.0 ms ±  49.9 ms    [User: 1211.7 ms, System: 1385.8 ms]
  Range (min … max):   883.2 ms … 1038.9 ms    10 runs

AsyncStd/FuturesOrdered

export ASYNC_STD_THREAD_COUNT=4

 hyperfine --warmup 3 'target/release/deps/spawner-2b3bc45f8742811b'
Benchmark 1: target/release/deps/spawner-2b3bc45f8742811b
  Time (mean ± σ):     917.4 ms ±  41.4 ms    [User: 1195.0 ms, System: 1276.6 ms]
  Range (min … max):   866.1 ms … 982.9 ms    10 runs
rmanoka commented 8 months ago

@akoshelev What's the benchmark code? Can you paste / link to the core benchmarking loop to understand the metrics?

akoshelev commented 8 months ago

I've pushed the benchmark code to this branch

rmanoka commented 8 months ago

Ill merge this in a couple of days