rayon-rs / rayon

Rayon: A data parallelism library for Rust
Apache License 2.0
10.52k stars 485 forks source link

Extra CPU usage when not enough work in par_bridge #795

Closed hniksic closed 1 year ago

hniksic commented 3 years ago

On a particular workload I noticed that Rayon uses all available CPU, but doesn't do its best, i.e. does better when the number of threads is reduced. The workload involves work that is easy to parallelize (essentially order-insensitive stream processing using par_bridge() with almost no mutable shared state) running on a 64-CPU server. On my laptop the it nicely parallelizes to 4 cores, running approximately 4x faster than sequentially. But on the 64-CPU server the work definitely doesn't proceed with speed anywhere near 64x of the sequential version on the same machine, while all the CPUs are churning at 100% the entire time.

Experimenting with RAYON_NUM_THREADS shows that performance peaks around 8-16 CPUs and gets worse with 32 and 64:

Measurements were done with release build and Rayon 1.4.0. The interesting thing is that in all tests all cores were at 100%, as measured by htop.

I was unable to reproduce the issue with simpler minimal examples, where Rayon seemed to distribute the load nicely. So I rewrote the code to use a crossbeam_channel to distribute the work to 64 manually created threads instead. Using that, it took 3.8s to process a million records, but all cores were only at 15-20% of CPU usage. I then added some batching (a thousand records in a batch) and sent the batches over the channel to reduce the overhead. With this change I got the speed of the channel version to 0.65s/million records, with CPUs averaging about 25% usage.

Of course, now I had to implement similar batching in Rayon, which I did, bringing the numbers to:

With batching Rayon is no longer able to outperform channel-based dispatch, although it comes close at its best numbers. Note that Rayon is always using 100% of the cores it runs its threads on, and in case of 64 threads it uses 100% of all cores, despite being slower than channel variant, which only uses about 25% of the cores.

Given the large number of CPUs and the low utilization in the channel variant, I believe that the work-giving thread (the one that reads the stream and uses par_bridge()) is limited by IO and is not providing work fast enough to saturate all 64 CPUs. This means that this workload will never utilize full 64 CPUs - and that's fine. The thing that bugged me was that Rayon seemed to be using all cores while getting less done than with a reduced number of threads - this seemed like a bug. Googling similar issues I found #642 and thought that maybe I stumbled upon that bug. I reasoned that the IO thread was providing just enough work to keep Rayon threads in busy-wait much of the time, making all cores report 100% usage, but wasting enough time in busy-wait to affect their ability to do useful work. Reducing the number of threads would improve performance because #642 doesn't appear when there is enough work. The bug wouldn't appear on my laptop for the same reason - it doesn't have enough CPU horsepower to outmatch the IO bandwidth of the reader thread.

But alas, upgrading to Rayon 1.4.0 didn't fix things, so I guess it must be a different problem.

A slightly simplified version of the main loop of the code looks like this:

// Deserialize lines from FILE into Obj and call PROCESS on the object.
// Deserialization and processing is performed in parallel when possible.

fn read_jsonl<Obj: DeserializeOwned>(
    file: impl AsRef<Path>,
    process: impl Fn(Obj) -> () + Sync,
) {
    let file_disp = file.as_ref().display().to_string();
    let reader = BufReader::with_capacity(
        1024 * 1024,
        File::open(file.as_ref()).unwrap_or_else(|e| panic!("opening {}: {}", file_disp, e)),
    );
    reader
        .lines()
        .map(Result::unwrap)
        .enumerate()
        .par_bridge()
        .for_each(|(lineno, line)| {
            if log_enabled!(Level::Info) {
                if lineno != 0 && lineno % 1_000_000 == 0 {
                    info!("{}: processed {} records", file_disp, lineno);
                }
            }
            match serde_json::from_str(&line) {
                Ok(rec) => {
                    process(rec);
                }
                Err(e) => warn!("{}: {}: {}", file_disp, lineno + 1, e),
            }
        });
}

The input file is a multi-gigabyte jsonl, one-object-per-line json, with lines of about 2K in length. In production code there are additional minor optimizations to iterate over Vec<u8> instead of strings to speed up the IO thread, but they make no difference regarding parallelism.

The process closure passed to the function does some processing of the deserialized object, re-serializes it into JSON, and stores it in a shared vector. The mutation of the vector (assignment and the extremely rare resize) are inside a parking_lot::Mutex, but the vast majority of the time is spent in processing, so the mutex is uncontended. I experimented with finer-grained locking, using a mutex for each row and an upgradeable rwlock for the vector, but there was no effect on performance whatsoever.

If someone is curious, I can show the channel version and the chunked version of the code.

hniksic commented 3 years ago

Note that in this case utmost processing efficiency is not a requirement, so I can afford to retain more elegant code (e.g. avoiding chunking) even if it is somewhat slower. The reason I investigated the issue was that I was curious why a 64-CPU server didn't bring a greater speedup compared to my 4-core laptop on what I considered highly parallelizable workload.

cuviper commented 3 years ago

One immediate problem I know with par_bridge() is that it doesn't let threads sleep when waiting for iterator items. They just keep spinning on thread::yield_now() until an item comes. To some extent that's just wasted CPU load, but at high thread counts I can imagine this would have negative effects, like a lot of memory bus traffic getting in the way of real work.

There's a comment that perhaps we could use a threadpool-specific yield (#548), which would let a thread participate in work-stealing at least. They still couldn't sleep unless we work out a means to wake them back up when items are available. Some of this is hampered by the abstraction boundary between rayon-core and rayon.

It's possible there are entirely different designs that could work better, like batching items and treating it more like a flat_map.

lakwet commented 3 years ago

Hello, I also have a problem with a high number of threads (more than 16): https://github.com/rayon-rs/rayon/issues/762

@hniksic I would be very curious to know how you improve performance with more than 16 threads and if I can implement what you did for your project.

hniksic commented 3 years ago

@lakwet Using Rayon I wasn't yet able to improve performance at 16+ threads. I did manage to make a version that outperformed Rayon by combining batching and crossbeam_channel for work distribution. The performance improvement is slight, but the CPUs spend much less power. (This may be specific to my use case, where my IO performance is the bottleneck.)

hniksic commented 3 years ago

I managed to reproduce the problem on a small example, at least on the 32-core (num_cpus::get() reporting 64 due to hyperthreading) server I have access to. I expect the example could be simplified further, but I prefer to keep the workload similar to what I work with, so that optimizations or workarounds still apply to my problem.

Running the program below results in peak preformance at around 16 threads:

$ ./target/release/parallel-bench
done 1000000 in 2.583879913 s
$ RAYON_NUM_THREADS=1 ./target/release/parallel-bench
done 1000000 in 17.34046454 s
$ RAYON_NUM_THREADS=2 ./target/release/parallel-bench
done 1000000 in 9.265604026 s
$ RAYON_NUM_THREADS=4 ./target/release/parallel-bench
done 1000000 in 4.717856639 s
$ RAYON_NUM_THREADS=8 ./target/release/parallel-bench
done 1000000 in 2.565539363 s
$ RAYON_NUM_THREADS=16 ./target/release/parallel-bench
done 1000000 in 1.926309717 s
$ RAYON_NUM_THREADS=32 ./target/release/parallel-bench
done 1000000 in 2.213428796 s
$ RAYON_NUM_THREADS=64 ./target/release/parallel-bench
done 1000000 in 2.625233038 s

In all cases all cores, as reported by htop, are at 100% utilization. The program:

use serde::de::DeserializeOwned;
use std::io::{BufRead, BufReader, Cursor, Read};
use std::sync::atomic::{AtomicUsize, Ordering};

const TEST_DATA: &str = r#"{"field0":0,"field1":2,"field2":2,"field3":0,"field4":2,"field5":7,"field6":2,"field7":2,"field8":7,"field9":0,"field10":5,"field11":1,"field12":[0],"field13":0,"field14":0,"field15":0,"field16":0,"field17":0,"field18":0,"field19":[9],"field20":2,"field21":17,"field22":1,"field23":2,"field24":1,"field25":1,"field26":4,"field27":0,"field28":1,"field29":1,"field30":1,"field31":3,"field32":"25Cs/5s09fCaccJ1KEkflg==","field33":2,"field34":1,"field35":0,"field36":[0],"field37":1,"field38":0,"field39":0,"field40":0,"field41":0,"field42":1,"field43":1,"field44":0,"field45":3,"field46":1,"field47":0,"field48":2,"field49":1,"field50":1,"field51":1,"field52":0,"field53":1,"field54":9,"field55":2,"field56":2,"field57":[0],"field58":2,"field59":14412114,"field60":0,"field61":1,"field62":2,"field63":0,"field64":2,"field65":0,"field66":0,"field67":0,"field68":0,"field69":1,"field70":0,"field71":0,"field72":1,"field73":7,"field74":0,"field75":0,"field76":7,"field77":2,"field78":0,"field79":5,"field80":0,"field81":0,"field82":0,"field83":6,"field84":2,"field85":2,"field86":2,"field87":2}
{"field17":0,"field18":0,"field59":14412114,"field60":0,"field82":0,"field57":[0],"field66":0,"field67":0,"field68":0,"field32":"25Cs/5s09fCaccJ1KEkflg==","field36":[0],"field39":0,"field70":0,"field41":0,"field74":0,"field75":0,"field12":[0],"field49":1,"field52":0,"field13":0,"field80":0,"field15":0,"field51":1,"field25":1,"field16":0,"field53":1,"field3":0,"ts":0}
{"field0":0,"field1":2,"field2":2,"field3":0,"field4":2,"field5":7,"field6":2,"field7":2,"field8":7,"field9":0,"field10":5,"field11":1,"field12":[0],"field13":0,"field14":0,"field15":0,"field16":0,"field17":0,"field18":0,"field19":[9],"field20":2,"field21":17,"field22":1,"field23":2,"field24":1,"field25":1,"field26":4,"field27":0,"field28":1,"field29":1,"field30":1,"field31":3,"field32":"Iw9ohtZ7t0+hBKUKhViqG50yjv3uSlnXtWSIi0G1cJgDAxg16+4Ekie3WieUPOZp","field33":2,"field34":1,"field35":0,"field36":[0],"field37":1,"field38":0,"field39":0,"field40":0,"field41":0,"field42":1,"field43":1,"field44":0,"field45":3,"field46":1,"field47":0,"field48":2,"field49":1,"field50":1,"field51":1,"field52":0,"field53":1,"field54":9,"field55":2,"field56":2,"field57":[0],"field58":2,"field59":8880866,"field60":0,"field61":1,"field62":2,"field63":0,"field64":2,"field65":0,"field66":0,"field67":0,"field68":0,"field69":1,"field70":0,"field71":0,"field72":1,"field73":7,"field74":0,"field75":0,"field76":7,"field77":2,"field78":0,"field79":5,"field80":0,"field81":0,"field82":0,"field83":6,"field84":2,"field85":2,"field86":2,"field87":2}
{"field17":0,"field18":0,"field59":8880866,"field60":0,"field82":0,"field57":[0],"field66":0,"field67":0,"field68":0,"field32":"Iw9ohtZ7t0+hBKUKhViqG50yjv3uSlnXtWSIi0G1cJgDAxg16+4Ekie3WieUPOZp","field36":[0],"field39":0,"field70":0,"field41":0,"field74":0,"field75":0,"field12":[0],"field49":1,"field52":0,"field13":0,"field80":0,"field15":0,"field51":1,"field25":1,"field16":0,"field53":1,"field3":0,"ts":0}
"#;

fn full_test_data() -> Vec<u8> {
    let mut full = vec![];
    for _ in 0..250_000 {
        full.extend_from_slice(TEST_DATA.as_bytes());
    }
    full
}

pub fn read_jsonl_rayon<Obj: DeserializeOwned>(
    input: impl Read + Send + Sync,
    process: impl Fn(Obj) -> () + Sync,
) {
    use rayon::prelude::*;
    let r = BufReader::new(input);
    r.lines()
        .map(Result::unwrap)
        .par_bridge()
        .for_each(|line| process(serde_json::from_str(&line).unwrap()));
}

fn main() {
    let data = full_test_data();
    let t0 = std::time::Instant::now();
    let cnt = AtomicUsize::new(0);
    read_jsonl_rayon(Cursor::new(&data), |obj: serde_json::Value| {
        serde_json::to_string(&obj).unwrap();
        cnt.fetch_add(1, Ordering::Relaxed);
    });
    println!("done {} in {} s", cnt.into_inner(), t0.elapsed().as_secs_f64());
}

Cargo.toml:

[package]
name = "parallel-bench"
version = "0.1.0"
authors = ["Hrvoje Niksic <hniksic@gmail.com>"]
edition = "2018"

[dependencies]
serde = "1.0.115"
serde_json = "1.0.57"
rayon = "1.4.0"

excerpt from /proc/cpuinfo:

processor       : 0
vendor_id       : AuthenticAMD
cpu family      : 23
model           : 49
model name      : AMD EPYC 7502P 32-Core Processor
stepping        : 0
microcode       : 0x8301034
cpu MHz         : 1890.943
cache size      : 512 KB
physical id     : 0
siblings        : 64
core id         : 0
cpu cores       : 32
apicid          : 0
initial apicid  : 0
fpu             : yes
fpu_exception   : yes
cpuid level     : 16
wp              : yes
flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid aperfmperf pni pclmulqdq monitor ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand lahf_lm cmp_legacy svm extapic cr8_legacy abm sse4a misalignsse 3dnowprefetch osvw ibs skinit wdt tce topoext perfctr_core perfctr_nb bpext perfctr_llc mwaitx cpb cat_l3 cdp_l3 hw_pstate sme ssbd mba sev ibrs ibpb stibp vmmcall fsgsbase bmi1 avx2 smep bmi2 cqm rdt_a rdseed adx smap clflushopt clwb sha_ni xsaveopt xsavec xgetbv1 xsaves cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local clzero irperf xsaveerptr wbnoinvd arat npt lbrv svm_lock nrip_save tsc_scale vmcb_clean flushbyasid decodeassists pausefilter pfthreshold avic v_vmsave_vmload vgif umip rdpid overflow_recov succor smca
bugs            : sysret_ss_attrs spectre_v1 spectre_v2 spec_store_bypass
bogomips        : 4990.77
TLB size        : 3072 4K pages
clflush size    : 64
cache_alignment : 64
address sizes   : 43 bits physical, 48 bits virtual
power management: ts ttp tm hwpstate cpb eff_freq_ro [13] [14]
lakwet commented 3 years ago

Thanks a lot for your answer.

I also notice that with more than 16 threads, performance decrease. And I also notice that when I set my radix sort with 10 threads, it seems to use the 32 threads of my Ryzen 9. At first I believe it was because my CPU has 16 physical cores, but apparently with 32 physical cores, there is the same threshold for performance. There might be a bug indeed. Apparently, when there is not much to compute, but a lot of data to move, we have this problem. In my radix sort, there is not much to compute, but most of the work is to move element from one place to another.

I tried to look at the code in rayon, but I am not familiar enough with Rust to fully understand everything. I may need to spend more time on it.

hniksic commented 3 years ago

And I also notice that when I set my radix sort with 10 threads, it seems to use the 32 threads of my Ryzen 9.

@lakwet Can you elaborate on this? In my investigations Rayon always respects the number of threads specified through RAYON_NUM_THREADS.

lakwet commented 3 years ago

I don't use RAYON_NUM_THREADS but the builder:

let pool = ThreadPoolBuilder::new().num_threads(thread_n).build().unwrap();

I have a CPU with 16 physical cores and 32 threads and I wrote this crate: https://crates.io/crates/voracious_radix_sort The peek of performance is at 16 threads. If I put more than 16 threads, performance decreases. But If set between 10 and 16 threads, I got the same results. Which is weird. Clearly, it should be faster with 16 threads than with 10. When I look at my htop, (which is not very rigorous), I can see that my multithread sort use all the 32 threads.

hniksic commented 3 years ago

@lakwet I see, thanks for the explanation. I haven't needed to limit the number of threads like that in my code yet, I just used the env var for testing. Ideally I'd like Rayon to utilize all cores by creating the appropriate number of threads, which is why I reported this issue.

Is it possible that you are not using the API correctly (not a criticism, just thinking out loud), or that you accidentally create multiple thread pools? In either case, I think the issue of Rayon not respecting ThreadPoolBuilder::num_threads() should probably be discussed in a different issue to avoid spreading this one to too many topics.

lakwet commented 3 years ago

I tested using RAYON_NUM_THREADS instead of the builder. RAYON_NUM_THREADS and the builder give the same runtimes for my radix sort depending on the number of threads I set.

So the "htop" method is not accurate at all.

hniksic commented 3 years ago

Apparently a similar problem was faced by bevy:

Bevy uses multi-threading throughout the engine: ECS scheduling, asset loading, rendering, etc. Before this release it used Rayon for almost all of these tasks. Rayon is nice because it is generally as simple as calling some_list.par_iter().for_each(|x| do_something(x)). Rayon then automatically breaks the for_each into tasks and runs them on as many cores as it can. Rayon is a great choice if you want to easily parallelize code, but it has the downside of being pretty cpu-hungry.

Bevy (and a number of other rust game engines and ecs frameworks using rayon) have received feedback that they were overly cpu hungry / usage was not proportional to "real" work done.

We decided to resolve this problem by building a custom async-friendly task system, which enables the creation of context-specific task pools. For example, you might have separate pools for compute, IO, networking, etc. This also gives us the flexibility to load balance work appropriately according to work type and/or priority. The cpu usage wins have been huge[...]

hans-helmut commented 2 years ago

Some citations on the issue:

njaard commented 1 year ago

I feel that this problem is a real footgun.

For example, if my Iterator is blocking or slow, par_bridge is basically contraindicated. This is unfortunately not documented.

A stopgap would be documenting par_bridge that it is only appropriate when the user is assured that the processing of items is slower than the rate at which they can be received.

njaard commented 1 year ago

@cuviper I guess you can mark this as closed!

cuviper commented 1 year ago

Ah, yes, I didn't notice that GitHub hadn't properly linked the issue to the PR. Thanks again for your change, it was published in rayon 1.6.1!

hniksic commented 1 year ago

Thanks for fixing this! I checked my minimal example from this comment on a 32/64 core machine, and it indeed seems to fix the issue. With Rayon 1.6.0 I get the following numbers:

$ RAYON_NUM_THREADS=8 ./target/release/test 
done 1000000 in 2.219801246 s
$ RAYON_NUM_THREADS=16 ./target/release/test
done 1000000 in 1.931417055 s
$ RAYON_NUM_THREADS=32 ./target/release/test
done 1000000 in 2.395053222 s
$ RAYON_NUM_THREADS=64 ./target/release/test
done 1000000 in 2.844460551 s
$ ./target/release/test                     
done 1000000 in 2.753685914 s

With 1.6.1 I get better times across the board and, most importantly, not setting RAYON_NUM_THREADS simply gives optimal performance:

$ RAYON_NUM_THREADS=8 ./target/release/test 
done 1000000 in 1.8929853859999999 s
$ RAYON_NUM_THREADS=16 ./target/release/test
done 1000000 in 1.772770773 s
$ RAYON_NUM_THREADS=32 ./target/release/test
done 1000000 in 1.765792873 s
$ RAYON_NUM_THREADS=64 ./target/release/test
done 1000000 in 1.593089524 s
$ ./target/release/test                     
done 1000000 in 1.599067426 s

Also, checking htop shows that Rayon 1.6.1 allows the CPUs to go to sleep when doing nothing, as the machine is not wildly spinning.