Open nazar-pc opened 9 months ago
There are two main factors that I see here:
.par_iter().zip()
, you can add .with_min_len(N)
before your fold to set a lower bound on how far it will split.par_bridge()
, as long as you have enough real computation that you don't get bottle-necked on its Mutex
. This should give you a pretty even per-thread fold, but with non-deterministic order of the items.Rayon uses "adaptive" splitting, not perfect per-thread splitting, in case the work is imbalanced or other threads are also busy with other work.
That is exactly what I want here. In the library I linked above this is done with a simple atomic counter that worker threads are using to pull the next piece of work to process. In that case inputs are read-only, so no mutexes or other heavier infrastructure is necessary and work is essentially split in sublists of size 1, such that threads are kept occupied in the most efficient way. I will check with #857 to see how it performs.
With indexed iterators like your .par_iter().zip(), you can add .with_min_len(N) before your fold to set a lower bound on how far it will split.
Was not aware of it, will give it a try as well, thanks!
Rayon's fold assumes associativity but not commutativity
Hm, is that documented somewhere? To me the fact that things can be processed in form of sublists of undefined size means that both should be the case and user can't really have any guarantees about order of things in which input is processed. Especially if better performance can be unlocked.
Interesting, just did testing of the above example code I linked a.par_iter().zip(b)...
ended up being slower than a.iter().zip(b).par_bridge()
, which seems counter-intuitive, I always assumed that parallel indexed iterators have the most knowledge about what is being processed and should be the most efficient as the result.
.with_min_len(1)
doesn't really help because it doesn't fix the root problem in this case. On my 32-thread CPU (13900K) for 128 elements of input rayon creates 128 sublists by default, which is excessive. If I change minimum then it does create fewer sublists, but also loses granularity of balancing between threads in the process.
I'd imagine a simple atomic counter like blst does for efficient split of work with single item granularity, while also creating at as many results to be folded as there is threads in thread pool.
https://github.com/rayon-rs/rayon/pull/857 is based on very old version of rayon, can you rebase it maybe (it is a very small change, so should be fine to do so).
so, i did a log of the algorithm (took the code from the benches) and there is virtually no overhead i can see for splitting and merging. however i then ran the code on a larger machine with more cores. still no visible overheads for computations but there are some overheads inside rayon. my guess is that due to the very low number of tasks available (compared to threads) the atomics which are used for the sleep algorithm get under pressure.
I rewrote code with rayon::scope(|_scope| scope.spawn_broadcast())
in a way similar to original code in blst and was able to beat it in terms of performance. But code is far less idiomatic. Interestingly, rayon::broadcast()
wasn't anywhere as fast for some reason.
I don't think atomics or mutexes make any difference here, the bottleneck in my case is math that is slow in case more elements need to be aggregated unnecessarily.
i'm still not too sure about that. here are the logs. each file is for the corresponding number of threads. the area of each box corresponds to the time it takes. overheads inside rayon are the boxes at the bottom left. as you can see:
Visualization looks cool, but I'm not sure how to read it to be completely honest.
My point was that doing 128 folds is unnecessary. On my machine with logical CPU cores there is no point in doing 128 folds if 32 is sufficient. Whether it is slower due to memory pressure or something else is kind of irrelevant if it is possible to make it not do 128 folds in the first place. And it seems to me that if order doesn't matter at all (which is the case very often) the behavior (and performance) should be similar to rayon::broadcast()
with an atomic counter.
Rayon's fold assumes associativity but not commutativity
Hm, is that documented somewhere? To me the fact that things can be processed in form of sublists of undefined size means that both should be the case and user can't really have any guarantees about order of things in which input is processed. Especially if better performance can be unlocked.
There are a few places in the docs that talk about associativity and parallel non-determinism, but we do generally keep the relative order of items. e.g. a list of 4 items could be fold/reduced like any of these ordered partitions:
We don't promise anything about the order in which any inner part is called on their own -- e.g. the individual items could be produced in any order, and the reduction (a, b) could happen before or after (c, d) in the second one -- but we do keep them in order on the way out.
Interesting, just did testing of the above example code I linked
a.par_iter().zip(b)...
ended up being slower thana.iter().zip(b).par_bridge()
, which seems counter-intuitive, I always assumed that parallel indexed iterators have the most knowledge about what is being processed and should be the most efficient as the result.
The devil is in trying to preserve that semblance of order on the way out, which par_bridge()
doesn't attempt at all. I'll have to think about how to make something like unordered_fold
that could do a better job of reusing the accumulator, for when you really don't care about the relative order.
Even with ordered fold
, in theory we should be able to tell when a join
split didn't get its latter half stolen, so it can continue in the same accumulator. We do have some of that visibility in join_context
, but right now that's too far abstracted from the parallel iterator implementation, and I think there would be borrowck challenges as well.
.with_min_len(1)
doesn't really help because it doesn't fix the root problem in this case. On my 32-thread CPU (13900K) for 128 elements of input rayon creates 128 sublists by default, which is excessive. If I change minimum then it does create fewer sublists, but also loses granularity of balancing between threads in the process.
If you sent min 1, I don't expect any behavioral change at all! The idea with that is to set a level that amortizes the cost of creating your accumulator, but as you say that's a balancing act.
857 is based on very old version of rayon, can you rebase it maybe (it is a very small change, so should be fine to do so).
Sure, I pushed a rebase.
But when we're only talking about 128 items for 32 threads, I fear it will still overestimate even the initial level splits, regardless of this change about re-splitting stolen work.
I rewrote code with
rayon::scope(|_scope| scope.spawn_broadcast())
in a way similar to original code in blst and was able to beat it in terms of performance. But code is far less idiomatic. Interestingly,rayon::broadcast()
wasn't anywhere as fast for some reason.
IMO, it's totally fine to reach for different primitives when the generic ("idiomatic") APIs don't meet your needs!
I'm not sure why broadcast
would be much slower. It does have a little more synchronization to collect its return values, but that should be tiny compared to your real work.
And it seems to me that if order doesn't matter at all (which is the case very often) the behavior (and performance) should be similar to
rayon::broadcast()
with an atomic counter.
The generic API doesn't know whether the order of reduction matters in your code, but we could explore more explicit APIs for you to communicate that, like an unordered_fold
I mentioned above.
If you sent min 1, I don't expect any behavioral change at all! The idea with that is to set a level that amortizes the cost of creating your accumulator, but as you say that's a balancing act.
I'm sorry, I did try various (higher) values, posted 1 by accident.
I'm not sure why broadcast would be much slower. It does have a little more synchronization to collect its return values, but that should be tiny compared to your real work.
I think there might be an optimization opportunity here due to usage of zero-size types. I am returning ()
from each thread, so essentially there is nothing to keep track of.
The generic API doesn't know whether the order of reduction matters in your code, but we could explore more explicit APIs for you to communicate that, like an unordered_fold I mentioned above.
The approach with unordered_fold
makes sense to me, though I'm still wondering how many users rely on the order right now.
I'm not sure why broadcast would be much slower. It does have a little more synchronization to collect its return values, but that should be tiny compared to your real work.
I think there might be an optimization opportunity here due to usage of zero-size types. I am returning
()
from each thread, so essentially there is nothing to keep track of.
It propagates panics through the same mechanism as the return value, enum JobResult
. We also do this for scope.spawn_broadcast
, but in a different way.
Hi, I'm recently working on improving the BPE trainer in tokenizers. And I find that it would be great if we could get the unordered_fold
.
The concrete usage scenario is to perform multi-threaded word frequency statistics for the results after pre-tokenization. If use into_par_iter().fold
direcetly, merging all the hashmap would be extremely slow, especially for languages like Chinese, in which we can't use space
for pre-tokenization, since we'll get a large number of unique entries in the hashmap.
Creating all the hashmaps according to the number of threads and access them is a kind of solution, but the code is not elegent. And it's not possible to further optimize if we do so. That is, we can't just start the reduce step while waiting for the last thread for counting to finish executing.
So I think it's important to provide the unordered_fold
api for this type of task.
Here's a quick attempt: https://github.com/rayon-rs/rayon/compare/main...cuviper:rayon:fold_unordered
I flipped the name to fold_unordered
, if only so it will group with other folds in the docs.
If you try out that branch, please let me know how it goes!
That is, we can't just start the reduce step while waiting for the last thread for counting to finish executing.
My implementation also does not start any reduction until all the folds are done. I think it would require significantly more internal changes to facilitate that, but I don't really have an idea of what that would look like.
Wow, thanks for your prompt reply, I'll try that later.
I think it would require significantly more internal changes to facilitate that, but I don't really have an idea of what that would look like.
Well, I haven't dive into rayon's implementation, but according to my understanding of high-level api, in order to achieve this kind of functionality, the result should be pressed into the iterator immediately when a worker has no new work. So this means that, although some workers might no finish, we could still consume part of the result iterator.
For example, the second job could be extremely slower than other jobs, so three results in shoud be available in the iterator while waiting for the slow one. When the slowest job is finished, we could do the last reduce directly.
This improves the overall utilization of the cpu instead of letting the cpu sit idle waiting for some tasks.
let ans = vec![1,10000,1, 1, 1, 1].into_par_iter(|| 0, |count, num| {
for i in 0..num {
count += 1;
} count
}).reduce( || 0, |a, b| a + b );
Update:
I have tried your implementation, and it works correctly.
The concept of that request is clear, but the actual implementation is harder, especially in such a generic and composable framework as rayon.
I have tried your implementation, and it works correctly.
Did it also help performance in any noticeable way?
I added some brief usage in the rayon-demo
benchmarks for map-collect strategies. It worked well in one case, but was rather poor in another. So I don't think we should jump to start using this in any existing generic code, but having it available for particular situations may be worthwhile.
test map_collect::i_mod_10_to_i::with_fold ... bench: 599,495.59 ns/iter (+/- 162,917.00)
test map_collect::i_mod_10_to_i::with_fold_unordered ... bench: 424,433.58 ns/iter (+/- 115,927.55)
test map_collect::i_to_i::with_fold ... bench: 18,600,808.80 ns/iter (+/- 3,209,686.58)
test map_collect::i_to_i::with_fold_unordered ... bench: 28,964,659.40 ns/iter (+/- 20,052,510.20)
(Plus, collecting a map does have some semantic ordering effects if there are any duplicate keys.)
I just noticed that
fold
callsidentity
many more times than number of threads in thread pool. In caseidentity
is a non-trivial operation, it might be quite expensive to call it unnecessarily.Would be great if its returned value was repurposed for folding of multiple sublists.
This discovery resulted from rewriting https://github.com/supranational/blst/blob/0d46eefa45fc1e57aceb42bba0e84eab3a7a9725/bindings/rust/src/lib.rs#L1110-L1147 with rayon (code there essentially does work stealing manually) and seeing significantly slower benchmark results.
Here is what I tried to do: https://github.com/nazar-pc/blst/blob/7d1074e7df3f31dc8b8acfde3260ac9ceac8f0d9/bindings/rust/src/lib.rs#L1092-L1128
The reason why I believe it is slower is that more
identity
elements are created and due to cryptography involved reducing more elements takes more time.Probably duplicate of https://github.com/rayon-rs/rayon/issues/840 that original author decided to close, but I believe there is a value in ability to do something like this in rayon (or maybe there is and I just don't see it).