rayon-rs / rayon

Rayon: A data parallelism library for Rust
Apache License 2.0
11.13k stars 502 forks source link

Spawning tasks to multiple thread pools from a single scope #782

Open pkolaczk opened 4 years ago

pkolaczk commented 4 years ago

I've got a bunch of tasks to process on many thread pools. E.g. some tasks are CPU bound so should go to the CPU thread pool but some other tasks are I/O bound and I want them processed in a different pool dedicated to I/O. Additionally all tasks need to use a common parent object with lifetime shorter than 'static, so I need a single scope wrapped around all that processing.

Is it possible to do with rayon?

cuviper commented 4 years ago

We don't have a single scope tied to multiple pools, but you could use nested scopes for this, something like:

let common = common_init();

io_pool.scope(|io_scope| {
    io_scope.spawn(|_| early_io_work(&common, 1));
    io_scope.spawn(|_| early_io_work(&common, 2));

    cpu_pool.scope(|cpu_scope| {
        cpu_scope.spawn(|_| cpu_work(&common, 1));
        io_scope.spawn(|_| io_work(&common, 1));
        cpu_scope.spawn(|_| cpu_work(&common, 2));
        io_scope.spawn(|_| io_work(&common, 2));
        // ...
    ); // waits for cpu_scope to complete

    io_scope.spawn(|_| late_io_work(&common, 1));
    io_scope.spawn(|_| late_io_work(&common, 2));
); // waits for io_scope to complete

drop(common);
pkolaczk commented 4 years ago

Ok, now what if the number of thread pools is known only at runtime? Recursion?

cuviper commented 4 years ago

Yeah, recursive nesting should be possible.

I guess we might be able to control that better within rayon, since we can do it iteratively and still enforce the 'scope lifetime. That might look something like:

pub fn scopes<'scope, OP, R>(pools: &[ThreadPool], op: OP) -> R
where
    OP: for<'s> FnOnce(&'s [Scope<'scope>]) -> R + 'scope

(or pools could be any IntoIterator<Item = &ThreadPool>)

But this seems like an unusual thing to want. I can understand separating CPU/IO pools, although something like tokio might be better for the IO part. What is a situation where you'd want a dynamic number of thread pools?

pkolaczk commented 4 years ago

I have a collection of objects and processing SOME of them require I/O but I/O performance is bad if multiple threads try to access the same hard drive. However, it is perfectly fine to access two or more drives independently. So I need one thread pool per device, and the number of threads is configured by the capabilities of the device (which are different for SSD and HDD).

The first thing I searched for was a map_async method that could accept a lambda returning a Future. I found something like that in the parallel-stream crate, but unfortunately that crate doesnt do the other useful things that rayon does.

As for Tokio, I don't want to bring in yet another big crate onto dependencies, because rayon is fine in the other parts.

pkolaczk commented 4 years ago

Ok, I'm "almost" there. Narrowed down to only one error:

fn nest<'scope, 'pool, 'vec, OP, R>(pools: &'pool[ThreadPool], scopes: Vec<&'vec Scope<'scope>>, op: OP) -> R
where
    OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
    R: Send,
    'pool: 'scope,
{
    if pools.len() > 0 {
        let new_len = pools.len() - 1;
        pools[new_len].scope(move |s| {
            let mut scopes: Vec<&Scope> = scopes;
            scopes.push(s);
            nest(&pools[0..new_len], scopes, |scopes| op(scopes))
        })
    } else {
        (op)(&scopes)
    }
}

pub fn scopes<'scope, 'pool, OP, R>(pools: &'pool[ThreadPool], op: OP) -> R
where
    OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
    R: Send,
    'pool: 'scope
{
    nest(pools, Vec::with_capacity(pools.len()), |scopes| op(scopes))
}
error[E0623]: lifetime mismatch
  --> src/pools.rs:12:43
   |
3  | fn nest<'scope, 'pool, 'vec, OP, R>(pools: &'pool[ThreadPool], scopes: Vec<&'vec Scope<'scope>>, op: OP) -> R
   |                                                                            -------------------
   |                                                                            |
   |                                                                            these two types are declared with different lifetimes...
...
12 |             let mut scopes: Vec<&Scope> = scopes;
   |                                           ^^^^^^ ...but data from `scopes` flows into `scopes` here

error: aborting due to previous error

For more information about this error, try `rustc --explain E0623`.

I guess this has something to do with the fact that the scope reference added to the vector has a shorter lifetime than the previous scope references in it (encoded in the type of the vector), but I don't know how to change the type of the vector so it can hold the new reference - at the end, I expect the 'vec to match the most nested scope...

I need a Rust lifetime wizard now ;)

pkolaczk commented 4 years ago

I managed to minimize the problem to the following snippet (took out the recursion):

fn nest<'scope, 'vec>(pool: &ThreadPool, scopes: Vec<&'vec Scope<'scope>>)
{
    pool.scope(move |s: &Scope| {
        let mut v = Vec::new();
        v.push(s);
        v.push(scopes[0])
    })
}
error[E0623]: lifetime mismatch
 --> src/pools.rs:8:16
  |
3 | fn nest<'scope, 'vec>(pool: &ThreadPool, scopes: Vec<&'vec Scope<'scope>>)
  |                                                      -------------------
  |                                                      |
  |                                                      these two types are declared with different lifetimes...
...
8 |         v.push(scopes[0])
  |                ^^^^^^^^^ ...but data from `scopes` flows into `scopes` here

Commenting out either of push lines makes the compiler happy. This is very weird to me and honestly I got stuck. Why is the compiler complaining here? What does it mean by "these two types" when it is pointing to one... ?

pkolaczk commented 4 years ago

I tried to reproduce this problem without Rayon and now I'm puzzled even more.

This compiles fine:

struct FooScope<'a> { _unused: &'a u32 }

fn with_foo_scope<'scope, F>(op: F)
    where F: for <'s> FnOnce(&'s FooScope<'scope>) + 'scope + Send {
}

fn lifetimes<'scope, 'vec>(foos: &'vec [FooScope<'scope>])
{
    with_foo_scope(move |s: &FooScope| {
        let x = [s, &foos[0]];
    })
}

But a very similar code with rayon doesn't:

fn nest<'scope, 'vec>(pool: &ThreadPool, scopes: &'vec [Scope<'scope>])
{
    pool.scope(move |s: &Scope| {
        let x = [s, &scopes[0]];
    })
}
   |
17 | fn nest<'scope, 'vec>(pool: &ThreadPool, scopes: &'vec [Scope<'scope>])
   |                                                  --------------------- these two types are declared with different lifetimes...
...
20 |         let x = [s, &scopes[0]];
   |                     ^^^^^^^^^^ ...but data from `scopes` flows into `scopes` here

What is the difference?

nickkuk commented 4 years ago

@pkolaczk maybe this is due to invariance of Scope<'scope> in 'scope.

pkolaczk commented 4 years ago

That looks likely. So is there no way of keeping more than one scope in a collection together?

pkolaczk commented 4 years ago

Indeed, this is a problem with invariance. The offending line is the marker:


pub struct FooScope<'scope> {
    marker: PhantomData<Box<dyn FnOnce(&FooScope<'scope>) + Send + Sync + 'scope>>,
}

fn with_foo_scope<'scope, F, R>(op: F)
    where F: for <'s> FnOnce(&'s FooScope<'scope>) -> R + 'scope + Send,
          R: Send
{
}

fn lifetimes<'scope, 'vec>(scopes: &'vec [FooScope<'scope>])
{
    with_foo_scope(move |s: &FooScope| {
        let x = [s, &scopes[0]];   // error:  data from `scopes` flows into `scopes` here
    })
}
cuviper commented 4 years ago

Because of the invariance, you would need all the scopes to have the same outer 'scope lifetime. But 'vec is necessarily a smaller lifetime, shrinking in each recursive nest, which means the closure can't outlive 'scope.

So this seems to be something we could only provide from rayon internals, where we can build a Vec<Scope<'scope>> by value.


In case this is a question, the scope invariance is indeed necessary. Otherwise it would be possible to spawn closures with smaller lifetimes, but the scope would only enforce that they complete before the original scope lifetime, which is too late.

pkolaczk commented 4 years ago

Ok, makes sense. Can you give me any hints on where to start from inside rayon?

cuviper commented 4 years ago

See rayon-core/src/scope/mod.rs

fn scope basically just calls Scope::new, then complete which executes the closure (catching panics) and then steal_till_jobs_complete for the spawns. At a glance this doesn't seem too hard to extend to multiple scopes, although scope creation wants to be in_worker for the target pool first. I was working on a branch to avoid that for #562, which would probably make it easier here too...

pkolaczk commented 4 years ago

Thank you. Before I dive into Rayon internals, I managed to unblock myself with a little-bit of unsafe. FYI, this is my code:

unsafe fn adjust_lifetime<'s, 'a, 'b>(scope: &'s Scope<'a>) -> &'s Scope<'b> {
    std::mem::transmute::<&'s Scope<'a>, &'s Scope<'b>>(scope)
}

fn nest<'scope, OP, R>(pools: &[&ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) -> R
where
    OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
    R: Send,
{
    if pools.len() > 0 {
        pools[0].scope(move |s: &Scope| {
            let mut vec = scopes;
            vec.push(unsafe { adjust_lifetime(s) });
            nest(&pools[1..], vec, op)
        })
    } else {
        (op)(&scopes)
    }
}

pub fn scopes<'scope, OP, R>(pools: &[&ThreadPool], op: OP) -> R
where
    OP: for<'s> FnOnce(&'s [&'s Scope<'scope>]) -> R + 'scope + Send,
    R: Send,
{
    nest(pools, Vec::with_capacity(pools.len()), op)
}

Is such use of unsafe safe in this case? This way, seen from the "outside" world, all scopes have the same lifetime as op lambda. I think it is ok to make the compiler forget about the fact they are nested and created/dropped sequentially. WDYT?

pkolaczk commented 4 years ago

And this is how to use it (for anybody who is interested):

 #[test]
    fn test_two_pools() {
        let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
        let pool2 = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
        scopes(&[&pool1, &pool2], |scopes| {
            let s1 = scopes[0];
            let s2 = scopes[1];
            for _ in 1..10000 {
                s1.spawn(|_| println!("thread-pool 1: {:?}", std::thread::current().id()));
            }
            for _ in 1..10000 {
                s2.spawn(|_| println!("thread-pool 2: {:?}", std::thread::current().id()));
            }
        });
    }
cuviper commented 4 years ago

I think your unsafe code is OK, in that nothing will actually observe the transmuted lifetimes in a way that could cause unsoundness, but I hope we can do better...

I'm actually wondering now -- I don't think rayon::scope really needs the OP: 'scope constraint, because it's executed synchronously. We need 'scope for spawns because they run asynchronously, so must not borrow from some inner scope. But something like this should be fine:

let vec: Vec<u32> = (0..100).collect();
let slice = &vec[..];
rayon::scope(|s: &rayon::Scope<'static>| {
    // We should be able to borrow here, but the spawns have to be static
    for &i in slice {
        s.spawn(move |_| println!("hello {}!", i));
    }
});

If we relax OP, then I think your original nesting code would just work, creating outermost 'scope scopes all the way down, while variance happily narrows the 'vec reference lifetimes.

cuviper commented 4 years ago

See #785 -- I hope you don't mind that I adapted your nesting code into a test case.

cuviper commented 4 years ago

785 is merged and published in rayon-core 1.8.0 and rayon 1.4.0. Is there anything else you need?

stevenengler commented 2 years ago

If anyone comes across this, here's a code snippet. It uses in_place_scope which is much faster when you have many pools, ~and llvm seems to apply a tail-call optimization which is nice~ (edit: nvm, no tail-call optimization here).

fn scope_all<'a, 'scope>(
    pools: impl ExactSizeIterator<Item = &'a rayon::ThreadPool> + Send,
    f: impl FnOnce(Vec<&rayon::Scope<'scope>>) + Send + 'scope,
) {
    #[inline]
    fn recursive_scope<'a, 'scope>(
        mut pools: impl Iterator<Item = &'a rayon::ThreadPool> + Send,
        scopes: Vec<&rayon::Scope<'scope>>,
        f: impl FnOnce(Vec<&rayon::Scope<'scope>>) + Send + 'scope,
    ) {
        match pools.next() {
            None => return f(scopes),
            Some(pool) => {
                pool.in_place_scope(move |s| {
                    let mut scopes = scopes;
                    scopes.push(s);
                    recursive_scope(pools, scopes, f);
                });
            }
        }
    }

    let vec = Vec::with_capacity(pools.len());
    recursive_scope(pools, vec, f)
}
cuviper commented 2 years ago

@stevenengler Nice! But yeah, I don't expect tail calls, because each scope has cleanup (waiting for spawns) as they return.