stlab / libraries

ASL libraries will be migrated here in the stlab namespace, new libraries will be created here.
https://stlab.cc
Boost Software License 1.0
660 stars 65 forks source link

Experimental batcher #258

Open aaronalbers opened 5 years ago

aaronalbers commented 5 years ago

I created a batcher using what I call a serial series. A serial series is like a serial queue but can be canceled. You can think of a serial queue as the Y axis, fully parallel as the X axis and batching everything in between.

You would use a batch when you know ahead of time how many units of work you have but don't want to completely occupy your executor with it. Or perhaps the work that needs to be done can be parallelized but is constrained on how many can be done at once.

Perhaps something like this would also be useful in the library.

Note: Of course having a batch size larger than the pool size of your executor is somewhat meaningless.

#include <utility>
#include <iterator>
#include <vector>

#include <stlab/concurrency/future.hpp>
#include <stlab/concurrency/utility.hpp>

/**************************************************************************************************/

namespace stlab {

/**************************************************************************************************/

inline namespace v1 {

/**************************************************************************************************/

template<class T, class I, class F>
future<T> serial_series(future<T> task, const std::pair<I, I>& range, F func) {
   std::for_each(range.first, range.second, [&](auto&& entity){
      task = task.then(func(std::move(entity)));
   });
   return task;
}

// https://codereview.stackexchange.com/questions/106773/dividing-a-range-into-n-sub-ranges
template <class I>
std::vector<std::pair<I, I>> divide_work(std::pair<I, I> range, std::size_t n) {
   if (n == 0) return {};
   if (n == 1) return {std::move(range)};

   std::vector<std::pair<I, I>> ranges;
   ranges.reserve(n);

   auto dist = std::distance(range.first, range.second);
   n = std::min<size_t>(n, dist);
   auto chunk = dist / n;
   auto remainder = dist % n;

   for (std::size_t i = 0; i < n-1; ++i) {
      auto next_end = std::next(range.first, chunk + (remainder ? 1 : 0));
      ranges.emplace_back(range.first, next_end);

      range.first = next_end;
      if (remainder) remainder -= 1;
   }

   ranges.emplace_back(range.first, range.second);
   return ranges;
}

template<class T, class E, class I, class F>
std::vector<stlab::future<T>> batch(E executor, std::size_t n, T initial, std::pair<I, I> range, F func) {
   auto sub_ranges = divide_work(std::move(range), n);

   std::vector<future<T>> series;
   series.reserve(sub_ranges.size());

   for (const auto& sub_range : sub_ranges) {
      series.push_back(serial_series(make_ready_future(initial, executor), sub_range, func));
   }

   return series;
}

template<class E, class I, class F>
std::vector<future<void>> batch(E executor, std::size_t n, std::pair<I, I> range, F func) {
   auto sub_ranges = divide_work(std::move(range), n);

   std::vector<future<void>> series;
   series.reserve(sub_ranges.size());

   for (const auto& sub_range : sub_ranges) {
      series.push_back(serial_series(make_ready_future(executor), sub_range, func));
   }

   return series;
}

/**************************************************************************************************/

} // namespace v1

/**************************************************************************************************/

} // namespace stlab

/**************************************************************************************************/

#include <iostream>
#include <vector>
#include <utility>

#include <stlab/concurrency/default_executor.hpp>

int main() {
   #if 1
   std::vector<int> tasks;
   for (int i = 0; i < 100; ++i) {
      tasks.push_back(i);
   }
   std::vector<stlab::future<void>> series = stlab::batch(stlab::default_executor, 10, std::make_pair(std::begin(tasks), std::end(tasks)), [](int i){
      return [i]{
         std::cout << std::to_string(i) + "\n";
      };
   });
   auto result_token = stlab::when_all(stlab::default_executor, [](){
      std::cout << "done\n";
   }, std::make_pair(std::begin(series), std::end(series)));
   #else
   std::vector<int> tasks;
   for (int i = 0; i < 100; ++i) {
      tasks.push_back(i);
   }
   std::vector<stlab::future<int>> series = stlab::batch(stlab::default_executor, 10, -1, std::make_pair(std::begin(tasks), std::end(tasks)), [](int i){
      return [i](int j) -> int {
         std::cout << std::to_string(i) + ":" + std::to_string(j) + "\n";
         return i;
      };
   });
   auto result_token = stlab::when_all(stlab::default_executor, [](std::vector<int> results){
      std::cout << "done\n";
   }, std::make_pair(std::begin(series), std::end(series)));
   #endif
   stlab::blocking_get(result_token);
   return 0;
}
dabrahams commented 2 years ago

I think we'd like to hear more detail about possible use cases for this feature in order to evaluate it for the library.

[IIUC (?), this implementation is using .then to chain together units of serial work, which is extremely inefficient. ]