rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.4k stars 626 forks source link

Shared can interact badly with futures that don't always poll their subfutures #330

Closed dwrensha closed 7 years ago

dwrensha commented 7 years ago

(I'm opening this issue to continue the discussion from https://github.com/alexcrichton/futures-rs/pull/305.)

The problem is that Shared, as it is currently designed, can interact poorly with certain futures, such as the ModedFuture sketched below. We should either formalize some reason why ModedFuture is an invalid implementation of Future, or we should redesign Shared to accommodate this kind of situation.

extern crate futures;
extern crate tokio_core;

use futures::{Future, Poll};
use futures::sync::oneshot;
use std::rc::Rc;
use std::cell::RefCell;

enum Mode { Left, Right }

struct ModedFutureInner<F> where F: Future {
    mode: Mode,
    left: F,
    right: F,
    task: Option<::futures::task::Task>,
}

struct ModedFuture<F> where F: Future {
    inner: Rc<RefCell<ModedFutureInner<F>>>,
}

struct ModedFutureHandle<F> where F: Future {
    inner: Rc<RefCell<ModedFutureInner<F>>>,
}

impl <F> ModedFuture<F> where F: Future {
    pub fn new(left: F, right: F, mode: Mode) -> (ModedFutureHandle<F>, ModedFuture<F>) {
        let inner = Rc::new(RefCell::new(ModedFutureInner {
            left: left, right: right, mode: mode, task: None,
         }));
        (ModedFutureHandle { inner: inner.clone() }, ModedFuture { inner: inner })
    }
}

impl <F> ModedFutureHandle<F> where F: Future {
    pub fn switch_mode(&mut self, mode: Mode) {
        self.inner.borrow_mut().mode = mode;
        if let Some(t) = self.inner.borrow_mut().task.take() {
            // The other future may have become ready while we were ignoring it.
            t.unpark();
        }
    }
}

impl <F> Future for ModedFuture<F> where F: Future {
    type Item = F::Item;
    type Error = F::Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let ModedFutureInner { ref mut mode, ref mut left, ref mut right, ref mut task } =
            *self.inner.borrow_mut();
        *task = Some(::futures::task::park());
        match *mode {
            Mode::Left => left.poll(),
            Mode::Right => right.poll(),
        }
    }
}

pub fn main() {
    let mut core = ::tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let (tx, rx) = oneshot::channel::<u32>();
    let f1 = rx.shared();
    let f2 = f1.clone();

    let (mut mfh, mf) = ModedFuture::new(
        Box::new(f1.map_err(|_| ()).map(|v| *v)) as Box<Future<Item=u32, Error=()>>,
        Box::new(::futures::future::empty()) as Box<Future<Item=u32, Error=()>>,
        Mode::Left);

    let (tx3, rx3) = oneshot::channel::<u32>();
    handle.spawn(f2.map(|x| tx3.complete(*x)).map_err(|_| ()));

    core.turn(Some(::std::time::Duration::from_millis(1)));

    handle.spawn(mf.map(|_| ()));

    core.turn(Some(::std::time::Duration::from_millis(1)));

    mfh.switch_mode(Mode::Right);

    tx.complete(11); // It seems like this ought to cause f2 and then rx3 to get resolved.

    // This hangs forever.
    core.run(rx3).unwrap();
}
dwrensha commented 7 years ago

My current inclination is to say that ModedFuture is an invalid Future implementation.

I propose that the following invariant must be upheld by any valid implementation of Future: When Future::poll() runs, it must propagate poll() calls to any objects that might have triggered the unparking of the current task.

I have changed heart on this topic since last week, largely because some of my code in capnp-rpc-rust now depends on the above invariant.

liranringel commented 7 years ago

This is one of the complications that we are facing because of the current asynchronous model of futures-rs (which will probably won't happen in callbacks model).

I cannot see how it could be enforced by the type system, so if that proposal will be accepted, it has to be well documented.

alexcrichton commented 7 years ago

@dwrensha given that https://github.com/alexcrichton/futures-rs/pull/305 landed which tweaked the semantics of Shared, could you spell out again what the problem is here with Shared? Rereading your previous comment I'm having difficulty reconstructing the scenario in which things go wrong.

dwrensha commented 7 years ago

@alexcrichton I've updated my code above so that it's now a complete program that you can compile and run to see the behavior I'm worried about.

Shared was designed to make it possible for multiple waiters to get notified when a future is ready. Currently it also grants each of those waiters the ability to halt the execution of the original future. This seems wrong to me.

dwrensha commented 7 years ago

And to be clear, I think Shared is a much more useful thing than ModedFuture, so if I had to choose between them (and I am in fact arguing that we do need to choose between them), I would choose to keep Shared.

alexcrichton commented 7 years ago

Ah ok thanks for the clarification! The title also makes more sense now in retrospect.

So put another way, the way Shared works right now is that one task is the "winner" and is actually blocked on the underlying future. If that task doesn't actually poll the future when the future is completed, then no one will make any progress.

In your example the moded future is holding onto a Shared which was the winner, but then it didn't poll the future again when it was woken up because it switched modes. If the moded future were to drop the Shared handle then it would work, however.

I do agree that I'd prefer Shared to work rather than ModedFuture. Your proposed guarantee sounds pretty good to me, but it's also quite subtle and I could imagine quite difficult to remember!

We could also perhaps consider this a bug in Shared and leverage with_unpark_event somehow to fix the bug there. That way when an unpark happens to a blocked task we can force extra side effects on the unpark, such as waking up all blocked tasks on a shared future.

cc @aturon, this is a very interesting case where a lack of guarantees around what an unpark translates to can cause things to run awry.

aturon commented 7 years ago

@alexcrichton This is indeed interesting. I'll note that it's basically the exact same issue we hit with mutexes for futures -- and if we followed @dwrensha's guarantee, it'd solve that problem as well.

This is definitely food for thought in 0.2.

aturon commented 7 years ago

Labeling as 0.2 just to make sure it's part of the general discussion...

dwrensha commented 7 years ago

I'm curious whether there exist any non-contrived examples of futures which don't maintain that guarantee.

aturon commented 7 years ago

Select is one example.

Sent from my phone

On Jan 6, 2017, at 5:11 PM, David Renshaw notifications@github.com wrote:

I'm curious whether there exist any non-contrived examples of futures which don't maintain that guarantee.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

carllerche commented 7 years ago

My gut reaction is that I am pretty against this guarantee. It just doesn't make sense in any non-trivial implementation of a future (of which tokio-proto is full of). It basically means that every call of poll you have to call poll on every sub future every time.. that doesn't seem reasonable.

dwrensha commented 7 years ago

Interesting! Sounds like we need to adjust the property. How about the following?

A future f is called faithful if: during each run of f.poll(), for each subobject g that might have triggered the unparking of the current task, f either drops g, polls g, or returns g. (That is, it "faithfully propagates" the notification.)

This should at least cover Select. Are there any futures existing in practice that are not faithful? @carllerche: can you point to a specific example in tokio-proto?

dwrensha commented 7 years ago

It basically means that every call of poll you have to call poll on every sub future every time.. that doesn't seem reasonable.

with_unpark_event() makes it possible to avoid always polling every subfuture.

carllerche commented 7 years ago

There are a number of problems w/ changing the behavior of futures in this direction.

First, it either requires always polling all futures or allocating by using with_unpark_event, increasing the amount of work that has to be done each "tick".

A worse outcome is that it forces a top level future to call poll on a sub future even if it isn't ready to handle the value returned by the sub future. This means that the top level future now needs to add additional logic to manage temporarily storing the value until it is ready to process it. Even worse, is that it is a huge wrench in the back pressure story. Imagine the sub future is a stream... now the top level future is required to call poll on it, potentially receiving an unbounded stream of values that are ready and having to buffer them.

This sounds similar to the mutex problem, and I believe that the correct strategy is to look into adding a hook in the task system to enable "mutex" like futures to notify a task and then get signaled when the notified task got polled. This would enable a "mutex" to grant access to a task for a single poll even if the task in question doesn't touch the mutex during that poll.

Of course, this would be a non trivial change and should be punted to 0.2.

carllerche commented 7 years ago

That being said, IMO the best 0.1 strategy is to use message passing based coordination.

dwrensha commented 7 years ago

Ah, I had only considered future::Select. I'm not sure how, if at all, my notion of "faithfulness" could be adapted to apply to stream::Select.

dwrensha commented 7 years ago

We could also perhaps consider this a bug in Shared and leverage with_unpark_event somehow to fix the bug there.

This idea sounds promising.

I've gone ahead and tried it out in my implementation of ForkedPromise, which is essentially a single-threaded future::Shared.

One somewhat sad part is that it requires a Mutex so that the EventSet can be Sync + Send. Moreover, the linear-time clone of Events needed on each call to with_unpark_event() is a bit worrying if we start using it too much.

alexcrichton commented 7 years ago

@dwrensha yeah the futures library is not currently optimized for heavy usage of with_unpark_event. The initial intention was that it'd be used once or twice for a task so we'd get by just fine with a small vector to avoid allocations and such.

Also as a side note I'd love to add single-threaded versions to the futures crate of various sync primitives. E.g. a futures-aware channel/queue, RefCell/Mutex equivalent, etc. I think it'd make a great module in futures itself and we could ensure that the API of sync mirrors it at least (or is a subset I guess)

dwrensha commented 7 years ago

My current thinking is that it would not work out well to require that all impls of Future satisfy some notion of "faithfulness", and therefore the current implementation of Shared is buggy because it assumes that all futures will faithfully propagate poll() calls.

It helps to consider the case of streams. If we require futures to be faithful, then we probably also need to require streams to be faithful, because it's easy to convert back and forth between streams and futures. But what would that mean for stream::Select? Currently stream::Select is not faithful because if the first substream is ready then it does not bother to call poll() on the second substream. You could imagine trying to fix that by always polling both substreams and by adding a field to hold onto an item in the case that both are ready. However, if the first substream is ready and the second is not, then poll() would return Ok(Ready(..)) and would also register the current task to be unparked when the second substream becomes ready. Now if this Select is itself the substream of another stream s0, say an alternating stream that takes turns pulling an element from some number of substreams, then s0.poll() needs to always call poll() on the Select in order to faithfully propagate events that could have been triggered by it. If doing so returns a Ok(Ready(..)), then s0 needs to hold onto that value somewhere. Indeed, it seems that s0 would need to be able to hold onto an unbounded number of values produced by its substreams!

The only plausible way I can see around such problems would be to make heavy use of with_unpark_event().

The alternative is simply to not require faithfulness, and instead fix the bug in Shared. Perhaps the fix will involve with_unpark_event(), but that will be a much more confined use of with_unpark_event().

dwrensha commented 7 years ago

The only plausible way I can see around such problems would be to make heavy use of with_unpark_event().

Actually, I'm not so sure that with_unpark_event() even helps much at all. A substream triggering an unpark does not imply that it will produce an item on its next poll().

alexcrichton commented 7 years ago

@dwrensha would you be interested in prototyping a version of Shared that leverages with_unpark_event? That should at least resolve that bug for now I think.

stuhood commented 7 years ago

I believe that I'm still seeing bad behaviour here as of 74700604fae154bbef176973285d0a8013e21d53. Notably, "Shared can interact badly with futures that don't always poll their subfutures" applies to nested Shared instances (as Shared won't actually poll unless it has been correctly marked with original_future_needs_poll).

Periodically, for a join_all that joins both a root Shared future A and its dependency Shared C (in a chain A -> B -> C), C will complete, and will unpark the Task underlying the join_all, but will not set the original_future_needs_poll flag of B or A, which causes the wakeup to be missed.

It seems like what needs to happen is actually a recursive unpark/marking of the transitive dependents of the completing Shared. In other systems I've worked with (primarily com.twitter.util.Future), this is accomplished by "Linking" a dependency Future to its dependents. Is it possible that the Shared clones could use the waiter/unparker lists to mark their dependents original_future_needs_poll recursively?

dwrensha commented 7 years ago

@stuhood could you post some code that demonstrates the problem?

stuhood commented 7 years ago

@dwrensha : I will try to get a repro this week. I have trace logs from the failure to wake up, and while they don't point directly to the culprit, I have a hypothesis.

The EventSet::insert calls triggered after with_unpark_event occur in the order that they were added to the EventSet (ie, from the root downward toward leaves). If a task is already busy polling when EventSet::insert is called, it seems possible that that sequence of calls might clear the original_future_needs_poll for a parent future before its child has also been marked original_future_needs_poll.

stuhood commented 7 years ago

Sorry guys, thought I'd have time to work on a repro during my vacation, but have been too busy. Next week.

stuhood commented 7 years ago

I still haven't managed to reproduce this outside of our repo, but I did find a much smaller workaround that might point to the cause.

When our main thread executes (approximately) the following:

join_all(
  [root1.shared().then(|_| ()), root2.shared().then(|_| ())]
).wait()?

it will miss a wakeup about one in ten times (afaict from thread stacks, it is not a deadlock: all threads are thread::parked outside of any locks).

On the other hand, the following succeeds reliably:

for root in [root1.shared().then(|_| ()), root2.shared().then(|_| ())] {
  root.wait()?
}
dwrensha commented 7 years ago

I still haven't managed to reproduce this outside of our repo ...

Any way for us to reproduce the problem would be helpful. I guess that by "repo" you are referring to some not-publicly-available components that are necessary to trigger the problem?