mreineck / ducc

Fork of https://gitlab.mpcdf.mpg.de/mtr/ducc to simplify external contributions
GNU General Public License v2.0
13 stars 12 forks source link

Allow custom parallelization functor. #7

Closed cantonios closed 1 year ago

cantonios commented 1 year ago

This is to support the use of a custom threadpool, like Eigen's or TensorFlow's. This will enable JAX/TF to use DUCC0 for FFTs with multithreading.

mreineck commented 1 year ago

This is just a spontaneous idea, but do you think it is possible/sensible to leave the ducc0 code basically untouched and just provide an alternative threading.cc that is based on the TF threadpool? This could make things easier in the future, when I might have to accomodate yet another thread pool from a different package. I'd volunteer to have a go myself at such a change, if I can find the necessary documentation.

cantonios commented 1 year ago

This is just a spontaneous idea, but do you think it is possible/sensible to leave the ducc0 code basically untouched and just provide an alternative threading.cc that is based on the TF threadpool?

Thanks @mreineck, yeah I thought about trying that first. The problem is that we need to somehow pass in existing threadpools. We don't necessarily use a global threadpool, so this allows us to share the threadpool between ops (reducing thread contention), and avoids overhead of creating a new threadpool with every execParallel call.

This could make things easier in the future, when I might have to accomodate yet another thread pool from a different package. I'd volunteer to have a go myself at such a change, if I can find the necessary documentation.

As written in this PR, it should be pretty generic. A user with a custom threadpool need only create their own Scheduler implementation and it should work. This is what it looks like for Eigen:

// Generic Scheduler implementation.
class SimpleScheduler : public ducc0::Scheduler {
 private:
  size_t ithread_;
  size_t nthread_;
  bool done_;
 public:
  SimpleScheduler(size_t ithread, size_t nthread) : ithread_{ithread}, nthread_{nthread}, done_{false} {}
  size_t num_threads() const override { return nthread_; }
  size_t thread_num() const override { return ithread_; }
  ducc0::Range getNext() override {
      if (done_) {
        return ducc0::Range();
      }
      done_ = true;
      return ducc0::Range(ithread_, ithread_ + 1);
  }
};

Eigen ThreadPool:

// Distribute tasks via Eigen::ThreadPool.
class EigenThreadpoolDist {
 public:
  EigenThreadpoolDist(Eigen::ThreadPool* threadpool) : threadpool_{threadpool} {}
  void operator()(size_t nwork, const std::function<void(ducc0::Scheduler&)>& func) const {
    // Adjust number of tasks to use.
    nwork = std::min<size_t>(nwork, 4*threadpool_->NumThreads());
    Eigen::Barrier barrier(nwork);
    for (int i=0; i<nwork; ++i) {
      threadpool_->Schedule(
        [&func, &barrier, i, nwork](){
          SimpleScheduler scheduler(i, nwork);
          func(scheduler);
          barrier.Notify();
        }
      );      
    }
    barrier.Wait();
  }
 private:
  Eigen::ThreadPool* threadpool_;
};

// Create an Eigen threadpool and call C2C.
Eigen::ThreadPool pool(/*nthreads=*/16);
EigenThreadpoolDist edist(&pool);
ducc0::c2c(m_cin, m_cout, axes, /*forward=*/true, /*fct=*/1.0f, /*nthreads=*/0, edist);

TensorFlow via ParallelFor:

class TensorFlowThreadpoolDist {
 public:
  TensorFlowThreadpoolDist(tensorflow::thread::ThreadPool* threadpool) : threadpool_{threadpool} {}
  void operator()(size_t nwork, const std::function<void(ducc0::Scheduler&)>& func) const {
    // Dummy cost, since we don't know any better.  Ideally we would know the
    // the sizes of the FFT inputs to be able to approximate this.
    // Set large so pfor will divide work based on number of available threads.
    constexpr uint64_t kMaxCostPerUnit = std::numeric_limits<uint64_t>::max();
    threadpool_->ParallelFor(nwork, kMaxCostPerUnit, 
      [&func, &nwork](uint64_t low, uint64_t hi) {
        for (uint64_t idx = low; idx < hi; ++idx) {
          SimpleScheduler scheduler(idx, nwork);
          func(scheduler);
        }
      }
    );
  }
 private:
  tensorflow::thread::ThreadPool* threadpool_;
};

// Get TF's worker threadpool and call C2C.
TensorFlowThreadpoolDist tfdist(context->device()->tensorflow_cpu_worker_threads());
ducc0::c2c(m_cin, m_cout, axes, /*forward=*/true, /*fct=*/1.0f, /*nthreads=*/0, tfdist);
mreineck commented 1 year ago

OK, this looks indeed promising! I'll try to work on this when I have some more spare time (hopefully in a few weeks), but for now I'd say let's merge the PR as it is, so you can continue with the necessary changes on the TF side. What do you think?

cantonios commented 1 year ago

I'd rather get this right than rush merging this MR. Given this discussion, I have one more possible idea to try.

I still have to make changes on Eigen to even support calling into third-party FFT libraries for Tensors, so there's no rush. I can start with single-threaded.

Thanks for taking the time to collaborate with us on this!

mreineck commented 1 year ago

OK, if you are not in a rush, then I'm all for thinking this through and getting it as future-proof as possible. I hope to get to work on this soon, but please let me know whenever this becomes more urgent for you.

mreineck commented 1 year ago

Superseded by #9, closing...