smol-rs / futures-lite

Futures, streams, and async I/O combinators.
Apache License 2.0
439 stars 25 forks source link

futures::TryStreamExt and futures::TryFutureExt #14

Closed ririsoft closed 3 years ago

ririsoft commented 4 years ago

Hello,

Are you considering adding futures::TryStreamExt and futures::TryFutureExt?

I find the and_then, err_into and into_stream and other chaining utilities quiet convenient. Or you recommend using futures for this on the long term ?

What do you think ?

ghost commented 4 years ago

Personally, I find such combinators a bit confusing because of the double monad (future and result). It's not obvious whether and_then() passes to the closure the Ok value of the result or the Output value of the future.

My intuition is that the "outermost" monad wins, so if we're applying and_then() to a future, I'd expect and_then() to take the output of that future.

Under that logic, a.and_then(f) is equivalent to async { f(a.await).await } rather than async { f(a.await?) }.

I haven't found those combinators very useful, though, because async/await feels clearer and easier to read (at least to me). What do you think? I'm curious what are the situations where you found those combinators preferable over async/await?

Also, and_then() feels to me like a vestigial combinator from the age of futures 0.1 where Future trait had two associated types, Item and Error. As a case in point, neither async-std nor tokio have such combinators in their preludes and almost nobody has ever expressed interest in them.

ririsoft commented 4 years ago

I am building a custom recursive walk dir on top of async_std::fs::read_dir. I have to admit that I had hard time to write this due to recursivity and the combinators confusion you mentioned above. In this situation I believe (I may be wrong) that async/await are not enough.

type WalkStream =
    std::pin::Pin<Box<dyn Stream<Item = Result<String, Box<dyn Error + Send + Sync>>> + Send>>;

async fn walk_dir<P>(path: P) -> WalkStream
where
    P: AsRef<Path>,
{
  fs::read_dir(path.as_ref().to_owned())
    .err_into()
    .try_flatten_stream()
    .and_then(|entry| async move {
        let m = entry.metadata().await?;
        if m.file_type().is_dir() {
            Ok(walk_dir(entry.path()))
        } else {
            Ok(async move {
                let path = entry.path();
                let url = Url::from_file_path(&path).map_err(|_| {
                    UrlError(format!(
                        "cannot convert path '{}' to a valid url",
                        path.display()
                    ))
                })?;
                Ok(url.to_string())
            }
            .into_stream()
            .boxed())
        }
    })
    .try_flatten()
    .boxed()
}

Let's make it clear: the above code is barely maintainable and the only proudness I have is to make it work!

To be honest I chose the combinator way instead of implementing the Stream trait on a custom struct because I do not fill comfortable enough in async rust and tend to avoid implementing the Future and Stream traits and the related pinning boiler plate as much as I can because I always fear to do it wrong. Maybe this is the time for me to jump in and implement a custom type that implements Stream, holding an internal reference to async_std::fs::ReadDir

Feel free to close the issue, I am totally fine with your comments.

ririsoft commented 4 years ago

The more I think about this the more I believe that indeed FutureExt and StreamExt should be as minimal as possible. However building a Stream or working with them is not an as smooth experience as one would think.

I could write a custom struct that implements Stream in a similar way that you did in stjepang/async-fs but I feel like this should not be the way to do. I should be able to work on top of the future and stream to build another stream with some simple helpers. Maybe something like poll_state from irrustible/futures-micro for Stream could help (poll_next_state ?).

ghost commented 4 years ago

The most powerful stream combinator is probably unfold(): https://docs.rs/futures-lite/0.1.11/futures_lite/stream/fn.unfold.html

You can create pretty much arbitrary streams with it. The problem with it is that it's incredibly unergonomic. I always struggle to compile any code that uses it.

Sometimes I wonder if I should just create a stream constructor based on async-channel. Imagine something like this:

fn simple_stream<T, F, Fut>(f: F) -> SimpleStream<T>
where
    F: FnOnce(Sender<T>) -> Fut,
    Fut: Future<Output = ()>;

Usage:

let my_stream = simple_stream(|sender| async {
    for i in 0..3 {
        sender.send(i).await; // Equivalent to: yield i;
    }
});
pin!(my_stream);

while let Some(val) = my_stream.next().await {
    println!("{}", val);
}

What do you think about that? :)

ririsoft commented 4 years ago

What do you think about that? :)

I think this is a very elegant way of providing a yield pattern on stable Rust until generators are stabilized. Coming from Go this looks very familiar to me.

The most powerful stream combinator is probably unfold(): https://docs.rs/futures-lite/0.1.11/futures_lite/stream/fn.unfold.html

I will give a try to unfold. I should have noticed it before opening this issue, sorry about that. I am curious on how hard I will have to fight the compiler to make it work with recursive calls such as those needed in a walkdir implementation. I will come back here to report.

Thank you for your time and help this is very much appreciated !

ririsoft commented 4 years ago

unfold definitely do the job for my use case.

pub fn walk_dir(root: impl AsRef<Path>) -> BoxedStream<Result<DirEntry>> {
    stream::unfold(State::Start(root.as_ref().to_owned()), |state| async move {
        match state {
            State::Start(root) => match read_dir(root).await {
                Err(e) => return Some((Err(e), State::Done)),
                Ok(rd) => return walk(vec![rd.boxed()]).await,
            },
            State::Walk(dirs) => return walk(dirs).await,
            State::Done => return None,
        }
    })
    .boxed()
}

enum State {
    Start(PathBuf),
    Walk(Vec<BoxedStream<Result<DirEntry>>>),
    Done,
}

fn walk(
    mut dirs: Vec<BoxedStream<Result<DirEntry>>>,
) -> BoxedFut<Option<(Result<DirEntry>, State)>> {
    async move {
        if let Some(dir) = dirs.last_mut() {
            match dir.next().await {
                Some(Ok(entry)) => match entry.file_type().await {
                    Err(e) => return Some((Err(e), State::Walk(dirs))),
                    Ok(ft) if ft.is_dir() => {
                        let wd = walk_dir(entry.path());
                        dirs.push(wd);
                        return walk(dirs).await;
                    }
                    Ok(_) => return Some((Ok(entry), State::Walk(dirs))),
                },
                Some(Err(e)) => return Some((Err(e), State::Walk(dirs))),
                None => {
                    dirs.pop();
                    return walk(dirs).await;
                }
            }
        }
        None
    }
    .boxed()
}

A channel based helper would deprecate the need for an intermediate state and would probably make the code even lighter.

I am considering putting this into an async-walkdir crate since the existing async-walk is tokio based. I could also get some inspiration from the Walkdir crate to add a few goodies such skipping traversing a directory matching a pattern. But I am not sure it would help anybody: for such simple code I prefer copying than adding a dependency.

As far as I am concerned you can close the issue if you wish to.

ririsoft commented 4 years ago

I finally published a crate for this, adding asynchronous filtering capabilities. I am sure I can improve the code way better but I am discovering the complexity of handling closures asynchronously. In particular I was not able to offer a filtering closure (returning a future) which uses a reference to an entry.