rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.37k stars 620 forks source link

`FoldWithStream` stream adapter #989

Open Ekleog opened 6 years ago

Ekleog commented 6 years ago

So in the series “here is a stream adapter I wanted and thus wrote so do you want it?”, now comes the FoldWithStream stream adapter, that:

  1. Consumes the stream
  2. Passes its first element as well as the remainder to a folding function
  3. Recovers the stream to be continued from the folding function
  4. Iterates

So I'm thinking for this one there might be better ways to do it. However, the best I managed with “standard” stream adapters is the stuff I deleted in https://github.com/Ekleog/yuubind/commit/b6e0aff488b380a3df16ec1b664ec6432a5e7030 … because it stack overflow'd.

So here is what I have written, if you think it'd be useful that I PR it I can do so. :)

enum NextStep<S: Stream, F: Future, Acc> {
    Stream(S, Acc),
    Future(F),
    Completed,
}

pub struct FoldWithStream<S, Acc, Fun, Ret>
where
    S: Stream,
    Fun: FnMut(Acc, S::Item, S) -> Ret,
    Ret: Future<Item = (S, Acc), Error = S::Error>,
{
    next: NextStep<S, Ret, Acc>,
    f: Fun,
}

impl<S, Acc, Fun, Ret> Future for FoldWithStream<S, Acc, Fun, Ret>
where
    S: Stream,
    Fun: FnMut(Acc, S::Item, S) -> Ret,
    Ret: Future<Item = (S, Acc), Error = S::Error>,
{
    type Item = Acc;
    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        loop {
            match mem::replace(&mut self.next, NextStep::Completed) {
                NextStep::Stream(mut s, acc) => {
                    match s.poll() {
                        Ok(Async::Ready(Some(i))) => {
                            self.next = NextStep::Future((self.f)(acc, i, s));
                        }
                        Ok(Async::Ready(None)) => return Ok(Async::Ready(acc)),
                        Ok(Async::NotReady) => {
                            self.next = NextStep::Stream(s, acc);
                            return Ok(Async::NotReady);
                        }
                        Err(e) => return Err(e),
                    }
                }
                    match f.poll() {
                        Ok(Async::Ready((s, acc))) => {
                            self.next = NextStep::Stream(s, acc);
                        }
                        Ok(Async::NotReady) => {
                            self.next = NextStep::Future(f);
                            return Ok(Async::NotReady);
                        }
                        Err(e) => return Err(e),
                    }
                }
                NextStep::Completed => panic!("attempted to poll FoldWithStream after completion"),
            }
        }
    }
}

pub trait StreamExt: Stream {
    fn fold_with_stream<Fun, Acc, Ret>(self, init: Acc, f: Fun)
        -> FoldWithStream<Self, Acc, Fun, Ret>
    where
        Self: Sized,
        Fun: FnMut(Acc, Self::Item, Self) -> Ret,
        Ret: Future<Item = (Self, Acc), Error = Self::Error>,
    {
        FoldWithStream {
            next: NextStep::Stream(self, init),
            f
        }
    }
}
tikue commented 6 years ago

I, too, have run into a case where I needed this exact combinator. I think it was when trying to thread a BiLock through a stream.

My gut feeling is that, if a stream becomes complex enough to require this behavior, it might be time to implement Stream with a bespoke struct. I also think that the core stream/future combinators should be limited to those that have widely applicable use. I think there's room for an external crate to experiment with things like this -- streamtools/futuretools, dual to itertools.