executors / executors-impl

A prototype implementation of the executors proposal
84 stars 13 forks source link

How to generically implement transform_reduce #5

Open crtrott opened 6 years ago

crtrott commented 6 years ago

Hi

we are looking into putting together a std::executors backend for Kokkos https://github.com/kokkos/kokkos/tree/std-executors-backend and I am puzzling over how to best implement a parallel_reduce (or in stl terms a transform_reduce).

One straight forward way seems to be to use the bulk_twoway_execute and use the ResultFactory to provide a handle to a std::atomic. While that probably would work no matter what the executor type is (as long as it supports bulk_twoway_execute) it is also an exceptionally slow way of implementing a reduction.

// Insert this into the for_each example
template<class ExecutionPolicy, class RandomAccessIterator, class T, class Function>
T transform_reduce(ExecutionPolicy&& policy, RandomAccessIterator first, RandomAccessIterator last, T init, std::plus<> , Function f)
{
  auto n = last - first;

  std::atomic<T> result = init;
  auto twoway_bulk_exec = execution::require(policy.executor(), execution::bulk, execution::twoway);

  twoway_bulk_exec.bulk_twoway_execute([=](size_t idx,  std::atomic<T>* result, impl::ignored&)
  {
    T val = init;
    *result += f(first[idx]);
  },
  n,
  [&]()-> std::atomic<T>* {return &result;},
  []{ return impl::ignore; }
  ).get();
  return result;
}

void foo() {
  std::vector<int> vec(10);
  for(int i=0; i<10; i++) vec[i] = i;
  int result = transform_reduce(par,vec.begin(),vec.end(),0,std::plus<>(),[&] (int& ptr) -> int {
    return ptr;
  });
  assert(result == 45);
}

Now if I know that the executor is mapping to std::thread I could probably do something with a map to get from the thread id to a scratch space location in the range of 0 to concurrency, so that I can store each threads contribution and then do some serial reduction later (or the last thread done does its stuff or something like that). But that requires the knowledge of executor mapping to std::thread. Is there something I am overlooking? Is the possibility of implementing this in a general way just outside of what executors are supposed to address for now?

jaredhoberock commented 6 years ago

One way to avoid having all agents accumulate to an atomic would to to use two passes. The first would accumulate partial sums, and the second would sum those partial sums into a single result. Here's a possible implementation.

This approach is portable and generic, but I would not expect it to be optimally efficient across all platforms.

crtrott commented 6 years ago

Thanks for putting that example together, but looking at it you ran into the exact same thing I mentioned: you need a way of getting something equivalent to std::thread::hardware_concurrency() in order to implement this in any meaningful performant way, since if I run this with 16 GPU threads it will probably be slower than the atomic implementation above.

jaredhoberock commented 6 years ago

I agree, an additional property that can give some idea of scale or size would be helpful. My approach has been to implement such a property as an extension.