rayon-rs / rayon

Rayon: A data parallelism library for Rust
Apache License 2.0
10.91k stars 494 forks source link

Rayon not engaging more than one core at a time? #736

Open davideps opened 4 years ago

davideps commented 4 years ago

I expected the code below to engage more than one core at a time, but it shifts load from one core to another until the work is done. On a machine with six cores that idles at 4% load, the total load never rises above 20%.

I've set the environment variable RAYON_NUM_THREADS to 6 and also the threadpool builder. Am I doing anything wrong?

use rayon::prelude::*;
use rand::Rng;

fn make_vec(n :i32) -> Vec<i32> {
    let mut rng = rand::thread_rng();
    let mut result = Vec::new();
    for _n in 0..n {
        result.push(rng.gen_range(0, 100));
    }
    result
}

fn main() {
    rayon::ThreadPoolBuilder::new().num_threads(6).build_global().unwrap();
    let mut v = make_vec(100_000_000);
    v.par_iter_mut().for_each(|p| *p -= 1);
}
cuviper commented 4 years ago

I suspect the serial make_vec is dominating your time, because the RNG is more complicated than the simple decrement you're doing in parallel.

cuviper commented 4 years ago

Tiny operations like this are also going to be memory bound. When there's so little to do, your time is spent waiting for loads and stores, and parallelism can't help that much. This kind of waiting will still register as cpu usage though, at a high level.

davideps commented 4 years ago

Ok. Two questions:

1) How might I build the Vec in parallel?

2) Inside the closure, can I call a more complex function? Can that function call RNG? Can it iterate over structs? I ask because the real function I want to use takes a hashmap stored inside a struct and changes it in place. It begins like this:

fn process(s: &mut AreaStimulus, wind :&Vec<Dir>) {
    let mut done :bool = false;
    let mut new :HashMap<Point, f32> = HashMap::new();

What is the syntax to call this function within a Rayon closure? Like this?

v_of_structures.par_iter_mut().for_each(|structure| fn_name(structure.stimuli, &global_wind));

Then I'll be able to test Rayon with a task that is CPU bound.

cuviper commented 4 years ago

1. How might I build the Vec in parallel?

It's a little challenging with thread_rng(), but the map_init method can help, like:

(0..n).into_par_iter().map_init(
    || rand::thread_rng(),
    |rng, _n| rng.gen_range(0, 100),
).collect::<Vec<i32>>()

2. Inside the closure, can I call a more complex function? Can that function call RNG? Can it iterate over structs? I ask because the real function I want to use takes a hashmap stored inside a struct and changes it in place.

You can call any function or arbitrary Fn closure that is Send + Sync. Rayon doesn't really know anything about what you do as long as it meets those constraints, enforced by the compiler.

At first glance, changing a map would make it a FnMut operation, which can't be shared among threads. You would have to wrap that map in some synchronization, probably a Mutex, to make it shareable. For effective parallelism, you would want to do most of your computation outside of that lock, because the time you are holding the lock is a serializing choke-point between threads.

davideps commented 4 years ago

Thank you @cuviper for the example code and for your work on Rayon. This is what I've been using in main(). It seems effective at instantiating agents in parallel and inserting them into a DashMap (called "agents").

(1..=NUM_AGENTS).into_par_iter().for_each(|n| {
    agents.insert(akey(n), create_agent(n));
});

I'll have to study your use of map_init(). My create_agent() function starts with:

let mut rng = rand::thread_rng();
cuviper commented 4 years ago

There's also for_each_init which should fit your example. It might not make a big difference to avoid calling thread_rng every time, but it's worth a try.

lakwet commented 4 years ago

replace that

use rayon::prelude::*;
use rand::Rng;

fn make_vec(n :i32) -> Vec<i32> {
    let mut rng = rand::thread_rng();
    let mut result = Vec::new();
    for _n in 0..n {
        result.push(rng.gen_range(0, 100));
    }
    result
}

by Cargo.toml

rand = "0.7.3"
rand_distr = "0.2.2"

your code

use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use rand_distr::{Distribution, Normal};
use rayon::prelude::*;

fn make_vec(n :i32) -> Vec<i32> {
    (0..n)
        .into_par_iter()
        .map(|_| thread_rng().gen_range(0, 100))
        .collect::<Vec<i32>>()
    }
    array
}