yoshuawuyts / futures-concurrency

Structured concurrency operations for async Rust
https://docs.rs/futures-concurrency
Apache License 2.0
414 stars 33 forks source link

[FEATURE] more combinators #184

Open Niedzwiedzw opened 7 months ago

Niedzwiedzw commented 7 months ago

futures-util StreamExt and TryStreamExt provide bunch of very useful (addictive?) combinators. I wonder if introducing something like that would be accepted as a PR for this project.

yoshuawuyts commented 6 months ago

@Niedzwiedzw it depends – which combinators did you have in mind?

Niedzwiedzw commented 6 months ago

I heavily depend on stuff like .filter_map() .try_filter_map() .flat_map() .scan() .chain() .flatten() .try_flatten() .try_for_each() etc :) those are the ones that come to mind at least

failable commented 6 months ago

For me, that's a big missing part.

nanoqsh commented 3 months ago

@Niedzwiedzw @failable I think the point of futures_concurrency crate is to provide combinators for concurrency and for other things are other crates with stream combinators like futures or futures_lite. They fit together perfectly, so far they don't have any conflicts and I hope there won't be any:

use {
    futures_concurrency::stream::StreamExt as _,
    futures_lite::{stream, StreamExt as _},
};

let a = stream::iter([1, 2, 3]);
let b = stream::iter([4, 5, 6]);
a
    .merge(b)
//   ^^^^^ merge from futures_concurrency
    .filter(|i| i % 2 == 0)
//   ^^^^^^ filter from futures_lite
    .for_each(|i| println!("{i}"))
    .await;

In my opinion, there is no need to overload the crate in any additionals.

But I wonder is it possible to use ConcurrentStream as a stream and use all this combinators on it?

tyilo commented 2 months ago

@nanoqsh I don't know if this issue refers to StreamExt or ConcurrentStream, but I think it would make sense to add more functions to ConcurrentStream, which take an async function instead of futures::StreamExt where they take a normal function.

So that you can do .filter(|i| async { i % 2 == 0 }).for_each(|i| async { println!("{i}") }) (for_each already exists on ConcurrentStream, but filter does not).

Niedzwiedzw commented 2 months ago

could even be a separate extensiom trait, like futures-util does