kokkos / kokkos-comm

Experimental MPI Wrapper for Kokkos
https://kokkos.org/kokkos-comm/
Other
14 stars 9 forks source link

Non-contiguous data handling, restructure for multiple transports #100

Open cwpearson opened 3 months ago

cwpearson commented 3 months ago

This is a big PR that does a lot of things that I felt were interrelated

Prepare for non-MPI "transports"

This is all modeled after Kokkos Core. Each transport will have its own subdirectory under src (like Kokkos' backends). The interface that each transport needs to implement is a struct. For example, Irecv, from KokkosComm_fwd.hpp:

template <KokkosView RecvView, KokkosExecutionSpace ExecSpace = Kokkos::DefaultExecutionSpace,
          Transport TRANSPORT = DefaultTransport>
struct Irecv;

Why a struct? Because in practice what this means is that the transport will have a partial specialization (not allowed for functions) that looks like this (from mpi/KokkosComm_mpi_irecv.hpp):

template <KokkosExecutionSpace ExecSpace, KokkosView RecvView>
struct Irecv<RecvView, ExecSpace, Mpi> { ... };

Where Mpi is a struct in the KokkosComm namespace, analogous to e.g. Kokkos::Cuda. A Serial transport would have a corresponding

template <KokkosExecutionSpace ExecSpace, KokkosView RecvView>
struct Irecv<RecvView, ExecSpace, Serial> { ... };

NCCL would have

template <KokkosExecutionSpace ExecSpace, KokkosView RecvView>
struct Irecv<RecvView, ExecSpace, NCCL> { ... };

and so on.

To support this, a lot of the include structure needs to be refined and adjusted to more closely match how Kokkos Core does it.

Future work:

Non-contiguous view handling in MPI

Now that we have a place to implement MPI-specific things, this is where we can do non-contiguous data handling strategies. Originally I thought this was orthogonal to the transport, but it is not - consider if we want to use MPI Derived Datatypes to handle non-contiguous data.

The current implementation basically defines a sequence of seven "phases" to coordinate Kokkos execution space instances, MPI communicator, and associated non-contiguous data handling operations:

Five phases to get the communication posted

  1. init_fence (optional fence before host-side calls)
  2. allocations (optional allocations)
  3. pre_copies (optional data copies)
  4. pre_comm_fence (optional fence before host-side calls)
  5. comm (actually make MPI calls)

Plus two more that happen after the communication has posted

  1. wait (optional wait for MPI calls)
  2. post-wait (optional operations after waits)

All MPI operations + non-contiguous data handling must be fit into these 7 phases.

Future work:

Reduce fencing due to host-side MPI calls

One problem with a higher-level interface as we've started defining it today is that people want to do this:

Kokkos::parallel_for("fill send buffers", space, ...);
space.fence(); // just one fence
MPI_Isend(...);
MPI_Isend
MPI_Isend
...

However, our semantics say that the communication is ordered in the space, which means for non-host spaces we have to fence internally in our e.g. isend before we actually call MPI_Isend. So for us, we get some pointless fences

Kokkos::parallel_for("fill send buffers", space, ...);
KokkosComm::isend(space, ...); // implicit fence
KokkosComm::isend(space, ...); // implicit fence
KokkosComm::isend(space, ...); // implicit fence
...

This introduces a "plan" construct like this:

std::vector<KokkosComm::Req<>> reqs = KokkosComm::plan(space, comm, [=](KokkosComm::Handle<Space> &handle) {
    KokkosComm::isend(handle, xp1_s, get_rank(xp1, ry), 0);
    KokkosComm::isend(handle, xm1_s, get_rank(xm1, ry), 1);
    KokkosComm::isend(handle, yp1_s, get_rank(rx, yp1), 2);
    KokkosComm::isend(handle, ym1_s, get_rank(rx, ym1), 3);
  });

Now our pure KokkosComm APIs would take this handle argument, though which the implementation tells the plan function whether it needs fences, and how its operation is implemented in terms of the 7 phases. The plan looks at all of those and issues the minimal number of fences. This returns one KokkosCom::Req for each async operation in the lambda.

We can still have a lower-level search-and-replace style API for interop as well. It can be implemented in terms of this plan, or more directly.

KokkosComm::wait, wait_any, wait_all

Free-standing functions to wait on KokkosComm::Req

Replaces https://github.com/kokkos/kokkos-comm/pull/64 and https://github.com/kokkos/kokkos-comm/pull/32

aprokop commented 3 months ago

How are you going to implement a loop over (waitany, process) using handles? For example, if receiving halos from multiple processors, wait for any to arrive, process its data, then wait for the next to arrive. To overlap communication and computation.

cwpearson commented 3 months ago

How are you going to implement a loop over (waitany, process) using handles? For example, if receiving halos from multiple processors, wait for any to arrive, process its data, then wait for the next to arrive. To overlap communication and computation.

I made the plan thing return an array of Req instead, implemented wait_any, and I made a big update to the PR comment.

dutkalex commented 3 months ago

How are you going to implement a loop over (waitany, process) using handles? For example, if receiving halos from multiple processors, wait for any to arrive, process its data, then wait for the next to arrive. To overlap communication and computation.

I made the plan thing return an array of Req instead, implemented wait_any, and I made a big update to the PR comment.

Do you think it would make sense to allow other kinds of statements inside of the plan lambda? Like computations which could be done on the fly upon reception of some data while waiting for other communications?

cwpearson commented 3 months ago

Do you think it would make sense to allow other kinds of statements inside of the plan lambda? Like computations which could be done on the fly upon reception of some data while waiting for other communications?

I think this is an interesting idea as a follow-on...users may be tempted to do something like this:

std::vector<KokkosComm::Req<>> reqs = KokkosComm::plan(space, comm, [=](KokkosComm::Handle<Space> &handle) {
    KokkosComm::isend(handle, xp1_s, get_rank(xp1, ry), 0);
    Kokkos::parallel_for(space, ...) // roughly...
    KokkosComm::isend(handle, xm1_s, get_rank(xm1, ry), 1);
  });

But this actually won't work properly because the KokkosComm::isend(handle, ...) call doesn't immediately inject the communication into the stream, so we'd probably have to offer some kind of KokkosComm::parallel_for(handle, ...) that could be called inside this region and make it visible to the plan.

dutkalex commented 3 months ago

we'd probably have to offer some kind of KokkosComm::parallel_for(handle, ...) that could be called inside this region and make it visible to the plan

Yeah I was thinking of something along these lines, but I think we can avoid redefining new parallel_for functions by having the isend return a new handle:

auto reqs = KokkosComm::plan( space, comm, [=]( KokkosComm::Handle<Space>& handle ){
    KokkosComm::isend(handle, xp1_s, get_rank(xp1, ry), 0).and_then( 
        [=]{ Kokkos::parallel_for( space, ... ); }
    );
    KokkosComm::isend( handle, xm1_s, get_rank(xm1, ry), 1 );
} );