smol-rs / futures-lite

Futures, streams, and async I/O combinators.
Apache License 2.0
424 stars 25 forks source link

Add a trait for abstracing over executors #93

Open notgull opened 5 months ago

notgull commented 5 months ago

In smol, Executor and LocalExecutor don't implement any common traits. So, it's impossible to create a common abstraction over both traits. This is a blocker for https://github.com/notgull/smol-hyper/pull/2#issuecomment-1896504478 and https://github.com/smol-rs/smol/issues/292.

However I think that we should have an abstraction for executors in general. We put an emphasis on a diversity of executors like smolscale but we have no way of abstracting over them. Therefore I think we should have a trait for spawning futures, specifically in this crate.

A couple of potential strategies here:

@smol-rs/admins Thoughts?

fogti commented 5 months ago

Sounds like a good idea. futures-task provides a possible abstraction, but as you already said it is unlikely to be a good abstraction for our (current and future) use cases. If we do introduce a new abstraction, we maybe should at least try to provide a compatibility interface to futures-task to avoid an ecosystem split (futures-task has 31 direct dependents (without counting the futures-rs ones)). Maybe also have a sneak peak at async_executors (which otherwise looks like a bit too heavyweight, but if we're done we might want to consider providing them with a PR of an implementation to make it support our traits too)

notgull commented 5 months ago

I've created a demo here as to what I have in mind. I'll expand it over the coming weeks, this is more of a "request for comments".

The design is especially hamstringed by our usage of Rust v1.63 and our lack of GATs. I'm not sure yet if GATs/TAIT is the preferred option for moving forwards here.

notgull commented 3 months ago

I've expanded the demo here to include a "race" mechanism (solving https://github.com/smol-rs/smol/issues/292) and slightly more refined traits. I would appreciate comments before I move forwards.

Specifically, I have questions for our deployment of this trait. I see one of the following options working:

yoshuawuyts commented 3 months ago

Thoughts?

I actually think we should consider deprecating the notion of a local executor, in favor of using async concurrency operators. This might seem counter-intuitive to propose as it breaks with the ecosystem status quo, but I think it would probably make concurrency in (async) Rust less confusing.

Executors exist to enable parallelism

The framing of "tasks are like async/.await versions of threads" is I think one that I popularized when we were developing async-std. I figured async_std::task should mirror std::thread and as a result we got pretty far with that. I actually think that in retrospect a better framing for tasks is as: "tasks are parallelizable futures".

In sync Rust both concurrency and parallelism are provided via the thread APIs. If you want to concurrently schedule two operations, your best option is typically to use threads. In async Rust however parallelism and concurrency are unbundled. We can execute any number of futures concurrently, and they don't have to be parallel. Under this model also the idea of a "single-threaded executors" makes little sense; as executors are only needed to enable parallelism.

Parallelizable futures

I've written a demo of what a "parallelizable future" can look like as part of the tasky crate. The idea is that we can use the same concurrency operations regardless of whether the underlying futures can be moved to different threads or not:

use tasky::prelude::*;
use futures_concurrency::prelude::*;

let a = Client::get("https://example.com").par(); // parallelizable future `a`
let b = Client::get("https://example.com").par(); // parallelizable future `b`
let (a, b) = (a, b).try_join().await?; // concurrently await both parallelizable futures

References

notgull commented 3 months ago

@yoshuawuyts

Thank you for taking the time to explain this to me. I read your blog posts but I might not have understood what you mean 100%, so forgive me for my ignorance.

Under this model also the idea of a "single-threaded executors" makes little sense; as executors are only needed to enable parallelism.

There are two things that this model does not allow that I think should be considered in executors.

Firstly, from a look at futures-concurrency it looks like it uses an O(n) method to poll all of the futures in the list. So it would probably work for a dozen futures not but hundreds of thousands. This is probably fixable by using a concurrent queue to handle readiness instead of the currently used mechanisms, but I'm not 100% sure if this is a design decision or not. On the other hand LocalExecutor can handle hundreds of thousands of concurrent tasks.

Secondly, I'm not sure if futures-concurrency allows for more than a set number of tasks on the fly. For instance take the following TCP server:

fn main() {
    let state = RefCell::new(State::new());
    let ex = smol::LocalExecutor::new();

    smol::block_on(ex.run(async {
        let socket = smol::net::TcpListener::bind("0.0.0.0:80").await.unwrap();
        while let Ok((client, _)) = socket.accept().await {
            let state = &state;
            ex.spawn(async move {
                do_something(client, state).await;
            }).detach();
        }
    });
}

You don't know how many tasks you'll spawn at a time here; it could be two, it could be two million. If we want it to be single-threaded (i.e. the unsend use case) then we really have no other option than to use an executor.


I've written a demo of what a "parallelizable future" can look like as part of the tasky crate. The idea is that we can use the same concurrency operations regardless of whether the underlying futures can be moved to different threads or not:

I actually think having an Executor trait would be a good idea for this idea. The weakness of the tasky crate is that it's tied to the async-std executor. With this trait the above example would then look like this:

use tasky::prelude::*;
use futures_concurrency::prelude::*;
use smol::LocalExecutor;

let ex = LocalExecutor::new();
let a = Client::get("https://example.com").par(&ex); // parallelizable future `a`
let b = Client::get("https://example.com").par(&ex); // parallelizable future `b`
let (a, b) = (a, b).try_join().await?; // concurrently await both parallelizable futures

I actually think this would be preferable to the design I currently have. It makes it so the actual concurrency is left to futures-concurrency (which may eventually be adopted into libstd, if I understand right?), then have a "postfix" notation for spawning things onto executors. But then have a trait that can be used to abstract over this "postfix" notation.

Heck, if you like the idea I can just add postfix notation to futures-task-lite and then say to use futures-concurrency to handle parallelism.

yoshuawuyts commented 3 months ago

@notgull I appreciate your questions; let me try and answer them one by one.

Scaling waking behavior

So on the point of waking time: futures-concurrency implements something we've dubbed "perfect waking". Rather than waking all futures on each poll, we only ever wake the futures which need to be woken. We do this by creating intermediate wakers which allow us to specifically track which futures asked to be woken. That means rather than having O(N^2) total wakeups, futures-concurrency schedules O(N) wakeups - which is the least amount of wakeups possible.

Dynamic concurrency

One issue with the example you've posted is that it is unstructured - calling spawn(..).detach() on tasks means that if they panic or return an error the calling code can't register that. And with control flow the other way around - there is no way to propogate cancellation down into the individual tasks.

In futures-concurrency we have a primitive called FutureGroup which is pretty similar to FuturesUnordered, except that it can be converted into a lending iterator. This means items can be inserted into it during iteration, which allows the main loop's body to be concurrently processed with the incoming connections.

We should still do better though: I want to leverage FutureGroup or StreamGroup to create a new trait: ConcurrentStream, enabling us to provide a Rayon-like interface but for strictly concurrent async processing. I think something like this should be achievable with that:

fn main() {
    smol::block_on(ex.run(async {
        let state = State::new();
        let socket = smol::net::TcpListener::bind("0.0.0.0:80").await.unwrap();
        socket
            .incoming()
            .concurrent()
            .for_each(async |stream| do_something(stream, &state).await)
            .await;
    });
}

Note here the absence of RefCell because there is no requirement that futures lifetimes are 'static. And if we were true to form we would also introduce a method try_for_each which would enable short-circuiting in the case of an error, which would enable us to (even conditionally) tear down all other futures currently being processed concurrently.

This work isn't quite implemented yet though; I've got a draft PR I need to tinker with some more to give it shape. But I've already used the core mechanics of futures-concurrency to prove that we can express these exact semantics.

On a shared executor trait

I'd love it if we could start adopting postfix spawn notations more widely, encouraging us to use the same concurrency operations for both parallel and concurrent execution. If you believe that a shared executor trait could help with that, then yes absolutely we should pursue that!

Something I feel might still be missing from our executor models is non-workstealing spawn APIs. I'm not super sure about this one, but imagine a spawn API of the following signature:

pub fn spawn<Fut>(f: Fut) -> Fut::Output
where
    Fut: IntoFuture + Send + 'static,
    <Fut as IntoFuture>::Output: Send + 'static,

Notably: we have something which may return a future, and that is Send - but the future itself does not need to be Send because workstealing does not apply. If workstealing should apply, then the future itself also needs to be Send. To which degree this is useful: I'm unsure. But I wanted to make sure to flag it, because this is a way to support multi-threaded execution outside of thread-per-core scenarios with types which internally still are still able to use !Send types.

notgull commented 3 months ago

I took benchmarks for the futures-concurrency types against async-executor::LocalExecutor, by spawning one million futures and then polling them. Here is what I found:

image

It looks like LocalExecutor and Join both poll in O(1) time, while FutureGroup polls in O(n) time.

yoshuawuyts commented 3 months ago

@notgull oh no, that's not good. I can assure you both join and FutureGroup implement near-identical poll loops. They should both implement O(1) wakeups.

Clearly something isn't going right though. Could you share the code you used for the benchmarks so I can investigate further?

notgull commented 3 months ago

I've uploaded the benchmarks here. Specifically, the benchmark is:

for _ in 0..1_000_000 {
    group.insert(future::ready(1));
}

block_on(async {
    while let Some(x) = group.next().await {
        black_box(x);
    }
});
notgull commented 3 months ago

I do think that there are some advantages to the local executor (e.g. the Task abstraction). But my main concern would be performance.

yoshuawuyts commented 3 months ago

@notgull thank you for uploading the benchmark; it turns out you surfaced an issue in the behavior of FutureGroup::insert. I've filed a patch fixing it here: https://github.com/yoshuawuyts/futures-concurrency/pull/168. With this patch applied, the benchmarks come out as follows:


     Running benches/co.rs (target/release/deps/co-71ce600071789f52)
join/futures_concurrency::join
                        time:   [78.730 ms 79.639 ms 80.239 ms]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low mild

group/futures_concurrency::FutureGroup
                        time:   [78.353 ms 78.457 ms 78.658 ms]
Found 2 outliers among 10 measurements (20.00%)
  1 (10.00%) low mild
  1 (10.00%) high mild

executor/async_executor::LocalExecutor
                        time:   [275.77 ms 277.54 ms 279.35 ms]

executor/async_executor::LocalExecutor #2
                        time:   [187.46 ms 189.04 ms 190.13 ms]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low severe

This is not an exact science of course, but treating this as a ballpark measurement shows that FutureGroup should perform approximately 2-3x better than local executors.

notgull commented 3 months ago

By the way, I've added postfix notation spawn to futures-task-lite.