I'm opening this draft PR to facilitate reviewing/comments on our efforts to integrate support for multiple communicators in Horovod. This is heavily inspired from your code @kimchitsigai and @mypey (in branches comms-idris-MP and comms-idris-JS), I've made a few clean ups and proposed a few simplifications, trying to follow the recommendations in https://github.com/horovod/horovod/issues/2139 .
Here is a summary of the approach:
We create an array of MPI contexts, each MPI context will be based on one user-provided communicator (just like in the current horovod_init_comm(MPI_Comm comm))
For each context/communicator, we create a specific controller, tensor queue, and operations manager. When these are created, we give them the corresponding mpi_context.
In the horovod loop, we construct the message list, and run communication operations, for each context/communicator one by one.
Most of the code modifications is to turn references to global_state.controller to global_state.controller[communicator_id] where the communicator_id is usually accessible through the message or request in internal functions. I've also assumed that we'll try to use global_state.controller[0] for COMM_WORLD, but nothing enforces it so far.
Note that the implementation is not yet complete, for now I haven't bothered with the adasum operations, and they are ignoring the multiple potential communicators, only using the one with index 0. Also, only a few tensorflow python operations support specifying which communicator to use for now.
Open questions
Mostly about whether to duplicate some global state variables. Remember that each controller produces a response list that is then processed BEFORE running the next controller. So, if we assume that one horovod loop leave these variables in a clean state, it should be ok not to duplicate them:
[ ] : Do we need to be careful about the global_state.fusion_buffer or can it be shared between different controllers?
[ ] : Do we need to be careful about the global_state.shared_buffer or can it be shared between different controllers?
[ ] : Do we need to be careful about the global_state.joined_size or can it be shared between different controllers?
[ ] : And check all other shared global variables.
Next questions are about how optimized this approach is:
[ ] : Should we compute all response lists first, then merge them, and run the Execute once? Or is the current approach of processing jointly each controller's response list and its execution ok?
Example
Here is how one would use this:
from mpi4py import MPI
# This will be our baseline world communicator
comm = MPI.COMM_WORLD
# Split COMM_WORLD into subcommunicators
subcomm = MPI.COMM_WORLD.Split(color=MPI.COMM_WORLD.rank % 2,
key=MPI.COMM_WORLD.rank)
# And here is our array of communicators
comms = [comm, subcomm]
import tensorflow as tf
import horovod.tensorflow as hvd
# Initialize Horovod
hvd.init(comm=comms)
# Let's try to operate on some tensors
a = (r+1)*tf.ones(10)
print("AlltoAll on WORLD", hvd.alltoall(a, communicator_id=0))
print("AlltoAll on sub communicator", hvd.alltoall(a, communicator_id=1))
To run this for instance on my little machine with 2 GPUs:
I'm opening this draft PR to facilitate reviewing/comments on our efforts to integrate support for multiple communicators in Horovod. This is heavily inspired from your code @kimchitsigai and @mypey (in branches
comms-idris-MP
andcomms-idris-JS
), I've made a few clean ups and proposed a few simplifications, trying to follow the recommendations in https://github.com/horovod/horovod/issues/2139 .Here is a summary of the approach:
horovod_init_comm(MPI_Comm comm)
)Most of the code modifications is to turn references to
global_state.controller
toglobal_state.controller[communicator_id]
where thecommunicator_id
is usually accessible through the message or request in internal functions. I've also assumed that we'll try to useglobal_state.controller[0]
for COMM_WORLD, but nothing enforces it so far.Note that the implementation is not yet complete, for now I haven't bothered with the adasum operations, and they are ignoring the multiple potential communicators, only using the one with index 0. Also, only a few tensorflow python operations support specifying which communicator to use for now.
Open questions
Mostly about whether to duplicate some global state variables. Remember that each controller produces a response list that is then processed BEFORE running the next controller. So, if we assume that one horovod loop leave these variables in a clean state, it should be ok not to duplicate them:
global_state.fusion_buffer
or can it be shared between different controllers?global_state.shared_buffer
or can it be shared between different controllers?global_state.joined_size
or can it be shared between different controllers?Next questions are about how optimized this approach is:
Example
Here is how one would use this:
To run this for instance on my little machine with 2 GPUs:
Note this outputs a timeline, where you can see if it goes through the NCCL ops or not (more info here ).
To compile with NCCL and MPI support I'm using the following command line: