kokkos / kokkos-comm

Experimental MPI Wrapper for Kokkos
https://kokkos.org/kokkos-comm/
Other
12 stars 9 forks source link

Clarify semantics of `ExecSpace` parameter for communication interfaces #108

Open dssgabriel opened 2 months ago

dssgabriel commented 2 months ago

KokkosComm currently exposes communication functions with an ExecSpace parameter whose purpose is not semantically clear, neither in the code nor in the documentation, and that doesn't map to the behavior we expect.

The documentation for, e.g. KokkosComm::isend, states:

template<CommunicationMode SendMode, KokkosExecutionSpace ExecSpace, KokkosView SendView>
Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm)

Parameters:

  • space: The execution space to operate in ...

Template Parameters:

  • ExecSpace: A Kokkos execution space to operate ...

This lets users rightfully think that the space parameter is used for specifying in which execution space the communication has to happen, not the one they must sync with for their communication to be correctly processed. However, the actual implementation of KokkosComm::isend (for contiguous views that don't require packing) does something like:

  space.fence();  // can't issue isend until work in space is complete
  MPI_Isend(/*...*/);

Why can't we submit more work in the space? What does this fence has to do with the execution space in which the communication operates?

Let's demonstrate these unclear semantics with a code example:

// Partition the execution space into two
auto instances = Kokkos::partition_space(space, 1, 1);
auto compute_space = instances[0];
auto comm_space = instances[1];

// Dispatch some parallel work that prepares our data `v` on the compute exec space
Kokkos::parallel_for(Kokkos::RangePolicy(compute_space, 0, v.extent(0)), [=](int const i) {
  v(i) = i;
});

// Send the prepared data using the communication exec space
KokkosComm::isend(comm_space, v, dest, tag, comm).wait(); // WARNING: `v` may not have been ready!

In this snippet, we split an execution space into two. Because we're specifying the compute_space execution space to the parallel_for, it is asynchronous and we must fence on it so that v is done initializing before calling KokkosComm::isend. However, the implementation of the latter fences on the given execution space (here, comm_space) before doing the actual send. In this example, this is the wrong space to fence on, and the fence is useless.

Passing an execution space can be useful — e.g. for specifying where to pack a non-contiguous view, or use a particular CUDA stream once we have the NCCL backend — but it shouldn't be just for us to manually fence on an execution space that may have nothing to do with the view we operate on.

I propose that we:

  1. Clarify the purpose and semantics of the execution space parameter;
  2. Explain to users that is up to them to guarantee that their data is ready to be used by KokkosComm (i.e. in the example above, they must call compute_space.fence() themselves before calling us);
  3. Remove the call to space.fence() from our functions that take an execution space (in the contiguous case).

Given that we would not fence on the execution space anymore, the plans proposed in #100 won't be needed, as we remove all pointless fences from our implementations.

cwpearson commented 1 month ago

I think our semantics should basically be "the operation is inserted into the provided execution space". It's consistent with Kokkos, easy to understand, and is the simplest to use. To the extent our docs do not reflect that, they should be fixed.

parallel_for(space, ...); // (1)
// since this is stream-ordered, okay for this isend to use views produced in (1), user does not need to fence
auto req = KokkosComm::isend(space, ...);
parallel_for(space, ...); // guaranteed to execute after isend "happens"
KokkosComm::wait(space, req); // enqueue wait in the execution space instance, (not currently in KokkosComm)
parallel_for(space, ...); // okay to re-use views from the communication, since it is definitely done by now

The primary downside is that for CommunicationSpaces that use host functions, the implementation has to insert fences. This is kind of a bummer because it kills any possibility of communication/computation overlap, not to mention the fences themselves may have a cost. The "Plan" thing proposed in #100 is one way to solve this, but I think we could do something even simpler which groups up a few communications and just does a single fence, and tells the communications the space is already fenced through an extra param or something.

Other points

  1. If you want to coordinate two spaces, you have to fence manually, as always in Kokkos.
  2. For the APIs when an execution space is not provided, it is implicitly DefaultExecutionSpace{}.
  3. If you want to explicitly order with respect to host, then you provide DefaultHostExecutionSpace.
  4. For stream-triggered MPI, these semantics are a natural fit, and we won't have to add fences.
  5. The operation is free to put work in that execution space (or any other), so long as the semantics are maintained.
dssgabriel commented 1 month ago

I think our semantics should basically be "the operation is inserted into the provided execution space".

I agree, the stream-ordered approach aligns well with both Kokkos and NCCL's semantics. :+1:

The primary downside is that for CommunicationSpaces that use host functions, the implementation has to insert fences. This is kind of a bummer because it kills any possibility of communication/computation overlap, not to mention the fences themselves may have a cost.

IMHO, it is premature optimization to try and avoid having multiple fences (at least for now). If users properly design their code they should be able to overlap compute/comms and not need synchronization.

In your example, if the comm operations were independent of the parallel_fors, they could be enqueued on a different exec space and thus overlap with the compute work:

parallel_for(spaceA, ...);                 // prepare view for comm 
auto req = KokkosComm::isend(spaceA, ...); // send view once done preparing
parallel_for(spaceB, ...);                 // independent work in spaceB that overlaps with comm in spaceA
KokkosComm::wait(spaceA, req);             // wait for comm to finish

I think, it's better to keep things simple for now and wait to see if this has any performance impact in our target applications.

For MPI specifically, @cedricchevalier19 proposed that we could call the MPI function within a 1-iteration Kokkos::parallel_for (with a non Kokkos-compliant reference capture?) so that we "simulate" enqueing the operation on an execution space (thus avoiding the need for a systematic fence?).

Lastly (and this ties in with #109), we may want to initializes KokkosComm handles with an associated execution space (e.g., a Handle<NCCL> would be initialized from a Kokkos::Cuda exec space). This would let us statically assert that the execution space passed to KokkosComm functions matches the one from the handle.

I think we're on the right track! :slightly_smiling_face: