rayon-rs / rayon

Rayon: A data parallelism library for Rust
Apache License 2.0
10.97k stars 497 forks source link

Clone+Send closures instead of Send+Sync closures? #1005

Open jgarvin opened 1 year ago

jgarvin commented 1 year ago

Say I have a container foo, and for each element in foo I want to lookup another container associated with that element, and process them all one by one. I might write something like:

fn yield_inner(foo: &Foo, table: &ContainerTable) -> Iterator<Item=&Bar> {
    foo.iter().flat_map(|x| { table.lookup(x).iter() } )
}

But if the lookup uses a thread local container table this breaks:

thread_local!(static TABLE: ContainerTable = ContainerTable::new(); )
fn yield_inner2(foo: &Foo) -> Iterator<Item=&Bar> {
    foo.iter().flat_map(|x| { 
        TABLE.with(|table| {
            table.lookup(x).iter() 
        })
    })
}

yield_inner2 will fail to compile because table has a lifetime that is limited to the body of the closure. This is annoying because most of the time you're not doing any of the things that lead thread local to need an "internal iteration" style design -- you're not accessing it from drop, causing things to come back to life, the thread is definitely going to outlive your use of the data, etc. I could work around the issue by using Rc and Option:

thread_local!(static TABLE: Rc<ContainerTable> = Rc::new(ContainerTable::new()); )
fn yield_inner3(foo: &Foo) -> Iterator<Item=&Bar> {
    let table = None;
    foo.iter().flat_map(move |x| { // `move` stores `table` on closure
        table = Some(TABLE.with(|table_rc| { table_rc.clone() }));
        table.as_ref().unwrap().lookup(x).iter()
    })
}

Now I think everything is fine as long as this iteration is single threaded. Since the move causes the closure to keep a clone of the Rc, the table will stay alive as long as the closure does, and the closure will be stored on the FlatMap returned by flat_map so the closure will stay alive as long as the overall iterator does. But since we're using thread local obviously our goal is to use multiple threads with rayon and par_iter:

thread_local!(static TABLE: Rc<ContainerTable> = Rc::new(ContainerTable::new()); )
fn yield_inner4(foo: &Foo) -> Iterator<Item=&Bar> {
    let table = None;
    foo.par_iter().flat_map_iter(move |x| { // `move` stores `table` on closure
        table = Some(TABLE.with(|table_rc| { table_rc.clone() }));
        table.as_ref().unwrap().lookup(x).iter()
    })
}

This doesn't compile -- I was expecting that each worker thread would get its own copy of the closure. However, ParallelIterator::flat_map_iter instead enforces that the closure is Send+Sync. What I think I want is a flat_map where instead the closure is Clone+Send, and every worker thread gets their own clone that they then reuse as they process elements. I could try writing a ParallelIteraterExt::flat_map_iter_clone but is that necessary or is there a better way? I haven't dug into implementing my own ParallelIterator enough to know if it's feasible with the current API or if internally it assumes Sync closures.

Edit: fixed some s/map/flat_map

cuviper commented 1 year ago

This doesn't compile

In general, please include the actual error messages, especially when you haven't provided a complete example. Stuff like -> Iterator<..> should probably be -> impl Iterator<..>, or -> impl ParallelIterator<..> for the rayon one, at the very least.

Otherwise, are you sure your sequential yield_inner3 version works? I wouldn't think so, because a closure can't return a borrow tied to its own lifetime. And more broadly, you're expressing Item = &Bar at the outer level, but what is this borrowing from? The only available answer is that it borrows from foo: &Foo, but there's no visibility to the lifetime of your Rc or closure, even if you could return borrows like that.

It possible that you've oversimplified your working code in order to ask a succinct question, but I need something more solid to work with. If you can get your sequential idea working in full, e.g. on play.rust-lang.org, then we can discuss whether it's possible to parallelize that.

jgarvin commented 1 year ago

Sorry, I naively thought all the parts had already been shown to compile separately so it would all work together. I think I have a better handle on what I want now and what I'm trying to do. Say you have this (which compiles!):

#![allow(dead_code)]
#![allow(unused_variables)]

use rayon::prelude::*;
use std::rc::Rc;

// Pretend we have a type that is neither Send nor Sync.
struct BigObject(*const (), i32);

fn expensive_query(x: i32) -> Rc<BigObject> {
    // pretend we do some real work here, query some global data store
    // or something
    Rc::new(BigObject(std::ptr::null(), x * 2))
}

fn build_iterator(
    visit: &(impl Fn(&BigObject) -> i32 + Sync),
) -> impl ParallelIterator<Item = i32> + '_ {
    let huge_container = vec![1, 2, 3, 4, 5, 6, 7, 8];

    huge_container.into_par_iter().map(|x| {
        let r = expensive_query(x);
        visit(&r)
    })
}

fn user_of_build_iterator_code() {
    let r: Vec<i32> = build_iterator(&|a: &BigObject| -> i32 { a.1 + 1 }).collect();
}

My basic goal is this: As the author of build_iterator, I would like users of my function to be able to use external iteration (so build_iterator should return some kind of iterator instead of taking a closure argument) to process the objects I'm going to produce. In the code above I'm mixing internal and external iteration -- build_iterator return a ParallelIterator, but it also takes a callback for actually 'visiting' each element. I would rather users be able to consistently use external iteration since that's considered idiomatic Rust and is more flexible.

Because Rc<BigObject>: !Send + !Sync, I can't just have my map closure return r. What I want, is a different method instead of map, call it extendable_map, that returns a special kind of iterator. If I were to write:

foo.into_par_iter().extendable_map(|x| do_work(x) ).extend_map(|x| do_more_work(x))

I want it to be equivalent to:

foo.into_par_iter().map(|x| do_more_work(do_work(x)))

In other words, it would make it so subsequent iterator methods describe work that each thread processing the map should do in addition to the work they were doing before. This would mean those iterator methods could takeRc<BigObject> even though it's !Send + !Sync. Then users could call extend_map, extend_filter, etc. That could be bothersome, so ideally they would just call methods called map, filter, etc. But then extendable_map needs to return some sort of ExtendableMap that implements those methods yet can't actually be Iterator (Iterator::map has a signature that's pretty constraining) and can't be ParallelIterator (ExtendableMap::map should extend the earlier extendable_map closure, not describe work that can be fanned out to new threads).

Is this a sane thing to want? Does something like this already exist? Is there a better way? Should I just accept internal iteration? etc etc. Help appreciated!

cuviper commented 1 year ago

I can't see a way to do that without gigantic duplication of the API to/from extendable intermediates.