rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.36k stars 620 forks source link

ForEach, Fold, and similar stream combinators can run saturated without returning from poll #1957

Open mzabaluev opened 4 years ago

mzabaluev commented 4 years ago

The combinators that poll a stream for exhaustion in a loop have a problem that's already been raised in #869: if the upstream consecutively returns Ready for a long time, the loop never breaks and the combinator's poll never returns for that long, starving other pending operations in the task from being polled.

To illustrate how this can be a problem for other code, consider this simple adapter for making futures cancellable:

use futures::channel::oneshot::{self, Canceled};
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::ready;
use pin_utils::unsafe_pinned;

struct CancelHandle(oneshot::Sender<()>);

#[derive(Debug)]
struct AlreadyDropped;

impl CancelHandle {
    fn cancel(self) -> Result<(), AlreadyDropped> {
        self.0.send(()).map_err(|()| AlreadyDropped)
    }
}

struct Cancelable<F> {
    op: F,
    stop_rx: oneshot::Receiver<()>,
}

impl<F> Cancelable<F> {
    unsafe_pinned!(op: F);
    unsafe_pinned!(stop_rx: oneshot::Receiver<()>);
}

impl<F: Unpin> Unpin for Cancelable<F> {}

impl<F> Future for Cancelable<F>
where
    F: Future,
{
    type Output = Result<F::Output, Canceled>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.as_mut().stop_rx().poll(cx) {
            Poll::Pending => {
                let output = ready!(self.as_mut().op().poll(cx));
                Ok(output).into()
            }
            Poll::Ready(_res) => Err(Canceled).into(),
        }
    }
}

fn make_cancelable<F>(op: F) -> (Cancelable<F>, CancelHandle) {
    let (stop_tx, stop_rx) = oneshot::channel();
    let fut = Cancelable { op, stop_rx };
    let handle = CancelHandle(stop_tx);
    (fut, handle)
}

It looks rather useful and intuitive, but this contrived example hangs with a busy-looping thread rather than canceling the task:

fn main() {
    let mut a = 0;
    let (fut, stop_handle) = make_cancelable(
        stream::repeat(1).for_each(move |n| {
            a += n;
            future::ready(())
        })
    );
    let mut executor = ThreadPool::new().unwrap();
    let res_handle = executor.spawn_with_handle(fut).unwrap();
    thread::sleep(Duration::from_millis(1));
    stop_handle.cancel().unwrap();
    let res = executor.run(res_handle);
    assert!(res.is_err());
}

In non-contrived usage with real streams, too, a ForEach with an always-ready processing closure will delay cancellation for as long as the stream yields items.

mzabaluev commented 4 years ago

An easy fix is to add a counter into such polling loops and, upon reaching a certain number of iterations, call an immediate wakeup for the task and return Pending. To make it tunable, the combinators equipped with loop guards could offer a method:

impl<St, Fut, F> ForEach<St, Fut, F> {
    pub fn yield_after_every(mut self, iterations: u32) -> Self {
        self.yield_after_every = iterations;
        self
    }
}
mzabaluev commented 4 years ago

The resolution of https://github.com/rust-lang-nursery/futures-rs/issues/869#issuecomment-548600024 offers making a dedicated stream combinator to force yield after N iterations, but I think it's a poor solution due to human aspect: it's very easy to write complex async code disregarding this, and that will work most of the time until a stream happens to be saturated somewhere that is critical.

mzabaluev commented 4 years ago

For programmers who are sure they don't need any silly busy-looping guards, but rather need that extra performance in polling something that, in principle, is prone to be pending on I/O, some caution-worded alternative combinators can be added: .for_each_uninterrupted(...) or something like that.

mzabaluev commented 4 years ago

Has this been solved by https://github.com/tokio-rs/tokio/pull/2160?

jonhoo commented 4 years ago

It's solved only in the context of tokio I'm afraid, but yes, in that context it should be solved. If we wanted a solution for all executors, we'd need to either wait for the coop stuff to evolve beyond tokio, or implement something like https://github.com/rust-lang/futures-rs/pull/2049 (which solved https://github.com/rust-lang/futures-rs/issues/2047).

mzabaluev commented 3 years ago

I like the cooperative budget approach outlined in https://github.com/rust-lang/rust/pull/74335#issuecomment-742775411 Is there an RFC for that?