rayon-rs / rayon

Rayon: A data parallelism library for Rust
Apache License 2.0
10.8k stars 491 forks source link

Prefix scans #393

Open jaupe opened 7 years ago

jaupe commented 7 years ago

I have a use-case to do prefix scans to gather cumulative totals/products of various quantitative measures. This is how I envisage how to do prefix scans from an API perspective:

// prefix sum
let seq = vec![1i64, 2, 3, 4, 5];
let sums = seq.par_iter().sums(); 
assert_eq!(sums vec![1, 3, 6, 10, 15]);

// prefix product
let seq = vec![1i64, 2, 3, 4, 5];
let prods= seq.par_iter().products(); 
assert_eq!(prods, vec![1, 2, 6, 24, 120]);

Would you like a PR for this or should I implement it as a separate crate?

cuviper commented 7 years ago

Is that API on any ParallelIterator, or only IndexedParallelIterator? Does it always return Vec, or is it more general like collect()?

I'd also like to hear how you would parallelize this. It seems inherently sequential, although we had some ideas for scan() (#329), which is a more general case.

jaupe commented 7 years ago

Hey @cuviper,

Looking at the Rayon API, I think it would good to make it as generic as possible; so ideally all iterators and using collect().

The algorithm in a pseudo-vector-algebra notation:

  1. Break the data into sub-slices and each sub-slice is processed in parallel.
    [1,2,3,4,5,6,7,8,9,10]= [1,2,3],[4,5,6],[7,8,9],[10]
  2. A prefix scan is processed on each sub-slice and waits for all prefix scans to be completed.
    [1,2,3],[4,5,6],[7,8,9],[10]=> [1,3,6],[4,9,15],[7,15,24],[10]
  3. Once all scans have been processed, sequentially process each sub-slice to compute the offset for each sub-slice. Once the offset is computed, adding the offset to each element in every sub-slice can be done in parallel.
    [1,3,6],[4,9,15],[7,15,24],[10]  
    => [1,3,6],6+[4,9,15],21+[7,15,24],45+[10] 
    => [1,3,6],[10,15,21],[28,36,45],[55] 
  4. Fold each sub-slice to merge them together.
    [1,3,6],[10,15,21],[28,36,45],[55] => [1,3,6,10,15,21,28,36,45,55]
cbarrick commented 7 years ago

Adding the offset can be done in parallel

Is this true? It seems to me that you have to compute the final result for one group before you know what offset to add to the next. If so, you're back to the sequential algorithm.

For example, in this step:

[1,3,6],[4,9,15],[7,15,24],[10]  
=> [1,3,6],6+[4,9,15],21+[7,15,24],45+[10] 
=> [1,3,6],[10,15,21],[28,36,45],[55] 

How do you know to add 21 to the third group before you compute 6 + 15 in the second group?

jaupe commented 7 years ago

Computing the offset/delta is sequential but the actual vector/scalar addition is parallelized. I can re-word it if it's not so clear.

cuviper commented 7 years ago

That sounds pretty reasonable.

Whether this belongs in rayon proper or its own crate is unclear.

jaupe commented 7 years ago

I think a generic scan function definitely deserves a home within the rayon api so one can create cumulative sums and products. I'm on holiday at the moment but I can submit a PR when I get back?

Qqwy commented 4 years ago

We are two years down the road, but as I am also facing a situation in which a (parallel) scan operation is required, I'd love to contribute with a PR for this.

@cuviper would that be acceptable?

cuviper commented 4 years ago

Sure, you're welcome to work on this. Don't forget #329 if you plan to make this more general than just sums and products.

Qqwy commented 4 years ago

:+1: All right! And I will keep #329 in mind.

An question about the implementation suggestion you mentioned above:

If I understand the source code for believe that the into_par_iter() for linked lists currently turns it back into a vector, since it uses the into_par_vec macro. As such, maybe we need a smarter way than the tmp.into_par_vec() you suggested above, since it seems (correct me if I'm wrong!) that we thus destroy the original 'nice split' only to rebuild it later.

Are there ways to let the threads communicate? (e.g. using a crossbeam channel?) What I have in mind is the following:

  1. The original iterator's elements are approx. evenly divided over our threads. (precondition)
  2. Each of the threads/tasks does a local fold to accumulate into a local Vec<T>.
  3. Each of the threads/tasks communicates the vector's last value over a channel. (as an Option<T> to accomodate for empty vectors). The task then blocks on a channel receive.
  4. In one thread, we read out these channels from the receiving end.
  5. In this one thread, we (sequentially) fold over the received values. In every iteration, we send the value back over the appropriate channel, so that the respective task can continue by starting (6).
  6. All tasks in parallel update their local accumulated vec<T> with the received values.

The things I am unsure about here are thus:

cuviper commented 4 years ago

I'm not able to dig into this right now, but...

Is it okay in Rayon to use a blocking receive, or will this create a potential for deadlocks?

You'd have the potential for deadlocks like #592 -- thread stealing could cause your receiver to be deeper on the same stack as the sender that would want to fill it.

(i.e. is the threadpool 'smart' enough to store a task for later if it is waiting)

We don't have runtime hooks into external blocking things like mutexes or channels. Even if we did, you'd have to actually suspend the whole stack frame to "store" it for later, or else switch to a new stack to execute stolen tasks, so that stored task can be resumed independently.

Qqwy commented 4 years ago

Thank you for the information! That means that using channels is probably not the right way to go.

If I understand the source code for believe that the into_par_iter() for linked lists currently turns it back into a vector, since it uses the into_par_vec macro. As such, maybe we need a smarter way than the tmp.into_par_vec() you suggested above, since it seems (correct me if I'm wrong!) that we thus destroy the original 'nice split' only to rebuild it later.

After digging a bit deeper, it seems that I was incorrect:

Does into_par_vec() then by default spread these over all cores, giving each a single element? Or is there some heuristic we have to pass? I found the LengthSplitter that seems to go down to 1 but its documentation mentions that "Rayon automatically normally attempts to adjust the size [...] to reduce overhead".

Qqwy commented 4 years ago

Another interesting note is that implementing something akin to the example code @cuviper gave above, we lose the indexability of our iterator, since flat_map is used. So I think that indeed (see #329 ) a VecTree abstraction (which can be converted to/from a nested iteration across multiple threads) is required. This also means that the implementation of drive will be different than those of the other ParallelIterator instances that use/are either a Producer or Consumer, since for the scan, we first have to consume everything, then perform a tiny amount of sequential work, and then need to be able to produce a new (indexable) parallel iterator.

I will probably start writing actual code and submitting a WIP PR next week (time permitting).

gnzlbg commented 4 years ago

A different way to implement this would be to use decoupled look-back: Merril et al., Single-pass Parallel Prefix Scan with Decoupled Look-back, 2016.

EDIT: I needed a couple of prefix sums in the past last weeks, and ended up doing them in a more naive way, but I'll try to find the time to dig more into this. They are quite useful, I needed them in the context of implementing parallel algorithms for maximum subsequence (e.g. see Perumalla et al., Parallel algorithms for maximum subsequence and maximum subarray, 1995) where the interesting building blocks are parallel prefix and postfix sums.

arieluy commented 7 months ago

I heard from someone who wanted to use my above PR in their project, so I've released it as a separate crate: https://crates.io/crates/rayon-scan I hope this will be helpful to anyone else in this thread who is looking for this feature!