yoshuawuyts / futures-concurrency

Structured concurrency operations for async Rust
https://docs.rs/futures-concurrency
Apache License 2.0
415 stars 33 forks source link

Fair chaining APIs #105

Open yoshuawuyts opened 2 years ago

yoshuawuyts commented 2 years ago

Now that #104 exists to close https://github.com/yoshuawuyts/futures-concurrency/issues/85, there is a real question about fairness and chaining. I've made the case before that in order to guarantee fairness, the scheduling algorithm needs to know about all types it operates on. When we were still using permutations and even just rng-based starting points, I believe this to be true. But I'm slowly coming around to @eholk's idea that this may not be the case.

Benefits

If we resolve this, I think we may be able to improve our ergonomics. Take for example the following code, which I believe to be quite representative of futures-concurrency's ergonomics:

let streams = (
    socket_stream.map(Either::Response),
    route_rx.stream().map(Either::Request),
    pinger.map(|_| Either::Ping),
);
let mut merged = streams.merge();
while let Some(either) = merged.next().await { ... }

The tuple instantiation imo looks quite foreign. In this repo's style, we'd probably instead choose to name the futures, flattening the operation somewhat:

let a = socket_stream.map(Either::Response);
let b = route_rx.stream().map(Either::Request);
let c = pinger.map(|_| Either::Ping);

let mut merged = (a, b, c).merge();
while let Some(either) = merged.next().await { ... }

But while I'd argue this is more pleasant to read, we can't expect people to always do this. The earlier example is often to easier to write, and thus will be written as such. But a chaining API could probably be even easier to author as well:

let mut merged = socket_stream
    .map(Either::Response)
    .merge(route_rx.stream().map(Either::Request))
    .merge(pinger.map(|_| Either::Ping));

while let Some(either) = merged.next().await { ... }

We used to have this API, but we ended up removing it. And I think there's definitely a case to be made to add this back. Just like we'd be expected to have both: async_iter::AsyncIterator::chain and impl async_iter::Chain for tuple, so could we have this for both variants of merge.

Implementation

I'd love to hear more from @eholk here. But my initial hunch is that perhaps something like ExactSizeIterator could help us. But rather than return how many items are contained in an iterator, it'd return the number of iterators contained within. That way outer iterators can track how often they should call inner iterators before moving on. I think this may need specialization to work though?

I think even if we can't make the API strictly fair, it might still be worth adding the chaining API - and we can possibly resolve the fairness issues in the stdlib? Or maybe we can add a nightly flag with the specializations on it as part of this lib? Do folks have thoughts on this?

eholk commented 2 years ago

One idea I had was to add an additional trait, something like this:

trait SchedulingWeight {
    fn scheduling_weight(&self) -> usize;
}

Then for tuples we'd have something like:

impl<A, B> SchedulingWeight for (A, B)
where
    A: SchedulingWeight,
    B: SchedulingWeight,
{
    fn scheduling_weight(&self) -> usize {
        self.0.scheduling_weight() + self.1.scheduling_weight()
    }
}

In the implementation of Future for (A, B), we could set it up so that if A::scheduling_weight() returns 1 and B::scheduling_weight() returns 2, then we poll B twice as often as we poll A.

There are some significant challenges with this approach. Mainly, most futures do not implement SchedulingWeight (in fact, all futures do not since the trait does not exist 😃), so we don't have a good way of graceful degradation. We might be able to work around a lot of the issues, but I think we're likely to end up with another sandwich problem where you have a Future + SchedulingWeight container that contains a Future that contains a Future + SchedulingWeight, and then we lose all the weighting information for the inner future.

So I think a more workable solution is to extend the future trait with a scheduling_weight (or maybe scheduling_hint or scheduling_weight_hint or priority_hint) method that has a default impl that returns 1. That way, by default we get the current behavior, but if futures want to provide more precise information they can override the method.

Also, if Rust gets impl specialization or refinement, that might open some other options up to us.

eholk commented 2 years ago

Adding something like priority_hint to the Future trait seems pretty similar to the size_hint method on Iterator: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.size_hint

yoshuawuyts commented 1 year ago

I'm re-visiting some of the outstanding issues on this repo, and I didn't get around to saying it last time: but I really like the idea of a scheduling_weight method on future. this would allow us to solve the fairness issue in a transparent way for all futures combinators, in a way that would compose really nicely.

Maybe we should ACP that to get an experiment going?

eholk commented 1 year ago

I think an experiment seems like a good idea. I wonder if we could do it as a separate crate to prove the idea and then use that as evidence in an ACP?