async-rs / parallel-stream

Data parallelism library for async-std.
https://docs.rs/parallel-stream
Apache License 2.0
93 stars 16 forks source link

A restrospection on interoperatabiliy with futures crate #15

Open jerry73204 opened 4 years ago

jerry73204 commented 4 years ago

On Restrospection

I've been looking for parallelism combined with async/await and thanks for your decent work. I'd like to share to retrospection on pulling this crate in my existing project.

My project is filled with stream combinators from futures' StreamExt and async_std's StreamExt. They are extension traits of futures' Stream. In this way, any streams with Stream trait equip with extended combinators automatically. It's great convenience when you're writing your own stream type.

On the contrary, the parallel-stream's ParallelStream is alien to above extension traits. It's a standalone trait with a family of implemented types. That is, when a stream is turned into a ParallelStream, it loses all combinators from those extension traits. Also, things get more complex when writing your own stream type. I think the limit() of ParallelStream is the root of evil.

I noticed aggressive trait bounds that is hard to satisfy. For example, the map method requires the f to have Send, Sync and Copy. Only few and very special types have both Sync and Copy. It makes map useless because it restricts the closure cannot have a local variable lacking one of the traits.

fn map<F, T, Fut>(self, f: F) -> Map<T> where
    F: FnMut(Self::Item) -> Fut + Send + Sync + Copy + 'static,
    T: Send + 'static,
    Fut: Future<Output = T> + Send, 

On Alternative Design

I gather above thoughts and attempted an alternative based on your work, and it comes the par-stream. Basically it provides an extension trait ParStreamExt to futures's Stream and solve the trait bound issue. The limit is given on demand. It sets the # of workers only for that stage. It's not 100% equal to your design because the limit only applies to one stage rather than a group of stages. Instead, I moved your design to another particular API.

let shared = Arc::new(AtomicUsize::new(0));
stream.par_then(None, |item| {  // None sets the limit to the number of cores.
    let shared = shared.clone();   // Clone a variable without `Copy` trait
    async move {
        let new_item = compute(item, shared);
        new_item
    }
})
.collect::<Vec<_>>();  // from futures' StreamExt

To the limit the workers of a group of combinators, I suggest the builder patten. So far, it's implemented yet in my crate, but we can see how it would become here.

stream
    .enumerate()
    /* start of group */
    .into_par_group(ParGroupConfig {  // turn to a parallel group builder
        limit: Some(4),
        ..Default::default()  // using default runtime and other default options, etc
    })
    .then(|item| { async move { /* omit */ } })  // first stage
    .filter_map(|item| { async move { /* omit */ } }) // second stage
    .build_stream()  // build a stream from the group builder
    /* end of group */
    . collect::<Vec<_>>();  // combinator from futures' StreamExt

In this way, it lets users to customize the whole parallel group in one config. It would work seamlessly with existing futures' combinators.

Here we may move to more thorough discussion to help the design to evolve. It's fine for me to look more carefully to my work and find a way to combine them.

arlyon commented 4 years ago

From the perspectives of ergonomy keeping the functionality as an extension trait rather than a parallel (hah) siloed ecosystem is much more flexible. Both crates have been interesting to use but it's nice to see some exploration into alternative approaches.