Open mrocklin opened 5 years ago
I do not know how to initialize an MPI environment without starting a new process. Every MPI implementation is different, and so every mpirun
/mpiexec
does something different when executed. Its a strange thing that the launching process is not part of the MPI specification, which means there is no standardization.
The closest thing I can think of to launching an MPI job from within python is my personal mpirical
package (https://github.com/NCAR/mpirical) which I developed for running MPI tests. However, it still uses subprocess
to launch the MPI process with mpirun
.
If starting another process is a non-starter (bad pun?), then the only thing I can think of is tailoring a solution to a particular implementation of MPI, such as MPICH or OpenMPI.
OK, so lets say we pin ourselves to something like OpenMPI (or whatever is most common). Does this become possible? If so, what is the path to doing this?
I think it becomes possible, but I think we would need some OpenMPI or MPICH developers to chime in. Maybe an issue here: https://github.com/open-mpi/ompi?
I don't suggest limiting yourself to only a single MPI implementation.
@kmpaul I can't speak for every MPI implementation, but usually mpirun
just does the work of setting up your MPI_COMM_WORLD
for you.
It is possible, but ugly, to do this manually. I've done this before in a previous life.
It requires using the MPI_Open_port
, MPI_Comm_accept
, MPI_Comm_connect
, and MPI_Intercomm_merge
APIs documented here: https://www.mpi-forum.org/docs/mpi-2.2/mpi22-report/node212.htm#Node212
You end up having to build up your communicator a process at a time.
However, you should note that some MPI implementations flat out do not implement these APIs. I believe OpenMPI and Intel MPI both do, but MVAPICH does not.
MPI_COMM_JOIN is an additional API that allows you to do this that assumes you have an existing socket connection between your two processes. Last I heard the MPI Forum was trying to deprecate this API, so I definitely don't suggest using this one. https://www.mpi-forum.org/docs/mpi-2.0/mpi-20-html/node115.htm
@jrhemstad I agree that we shouldn't limit ourselves to a single MPI implementation, but you also point out a problem that finding a solution that works for all implementations may not be possible...easily. (This API is supported in the MPI 3.1 spec, too.)
I was suggesting that one avoids doing anything implementation specific. The APIs I suggest are part of the MPI standard, they're just so infrequently used that some implementations just neglect to implement them.
MPI libraries use an interface called Process Management Interface (PMI) to interact with the job launcher. Different MPI libraries use different versions of PMI that are incompatible with one another. There has been efforts for standardization and compatibility but AFAIK this is far from complete.
In current state, the job launcher/manager should provide the same PMI version that the MPI library is using. Hence, the tie-in between MPICH MPI library + hydra launcher, OpenMPI library + orte launcher. Slurm provides its own variant of PMI and both MPICH and OpenMPI have configuration option to use PMI interface compatible with SLURM.
It is a non-trivial effort for DASK to replace the MPI launcher. It will have to
1) to provide implementation for the PMI interface use by a particular MPI library 2) provide the backed for exchanging control information that happens through the launcher/deamon processes
And it will have redo 1 for each MPI library it has to be compatible with.
@jrhemstad, dynamic process model (MPI_Comm_spawn, MPI_Comm_accept, MPI_Comm_connect) are for a way for independent MPI applications to launch, discover and connect with each other. The assumption is that you are in an MPI environment already. So it does not obviate the need for the above, especially if you want to use existing MPI libraries.
@jrhemstad, dynamic process model (MPI_Comm_spawn, MPI_Comm_accept, MPI_Comm_connect) are for a way for independent MPI applications to launch, discover and connect with each other. The assumption is that you are in an MPI environment already. So it does not obviate the need for the above, especially if you want to use existing MPI libraries.
This is not correct.
It is certainly possible to launch a process w/o mpirun
and then bring it into an MPI communicator---I have done it. You just need to call MPI_Init
in the process.
See SO post here: https://stackoverflow.com/questions/15578009/difference-between-running-a-program-with-and-without-mpirun
This is not correct.
It is certainly possible to launch a process w/o
mpirun
and then bring it into an MPI communicator---I have done it. You just need to callMPI_Init
in the process.
@jrhemstad, my bad. I agree with you. at least according to the specification, one should be able to call these from a singleton process which was not launched through a process manger.
I am skeptical about the current state of implementation for these in MPI libraries though. Like you said, not all libraries support these APIs and even if they do, probably not on all system configs.
Also, this is not the most performant way of initializing MPI. And it can be quite ugly like you point out.
Given this seems ugly in general, and I hate to say let's create yet another standard, but should we start with OpenMPI and then slowly add other variants as time goes on that follow the same standard?
I am skeptical about the current state of implementation for these in MPI libraries though. Like you said, not all libraries support these APIs and even if they do, probably not on all system configs.
Also, this is not the most performant way of initializing MPI. And it can be quite ugly like you point out.
Absolutely! It's definitely not going to be as easy as just doing mpirun -n N ./a.out
and definitely not going to be as performant, but does that performance really matter? It's a one-time, upfront cost.
I think a good place to start is to create a simple client/server C++ code that exercises the MPI_Open_port
, MPI_Comm_accept
, MPI_Comm_connect
, and MPI_Intercomm_merge
APIs and it can be tested across various MPIs to get confirmation of which implementations actually support it.
Given this seems ugly in general, and I hate to say let's create yet another standard, but should we start with OpenMPI and then slowly add other variants as time goes on that follow the same standard?
To be clear, the APIs I described are part of the MPI standard that all MPI implementations should implement, it's not specific to OpenMPI.
From what I recall when I've played with this in the past, OpenMPI is one implementation that does support these APIs, so it's a good place to start.
To be clear, the APIs I described are part of the MPI standard that all MPI implementations should implement, it's not specific to OpenMPI.
From what I recall when I've played with this in the past, OpenMPI is one implementation that does support these APIs, so it's a good place to start.
Understood. So should we start with OpenMPI to see if the standard that's already suppose to be there is universally used :)
mpi4py.futures may be useful for this: https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html
. Absolutely! It's definitely not going to be as easy as just doing
mpirun -n N ./a.out
and definitely not going to be as performant, but does that performance really matter? It's a one-time, upfront cost.
What's the upfront cost like? Microseconds, milliseconds, seconds?
I think a good place to start is to create a simple client/server C++ code that exercises the
MPI_Open_port
,MPI_Comm_accept
,MPI_Comm_connect
, andMPI_Intercomm_merge
APIs and it can be tested across various MPIs to get confirmation of which implementations actually support it.
+1
I do not think using MPI Dynamic Process Management (DPM) (accept, connect, etc.) is about just the upfront cost. The DPM features though supported in an MPI library, may not work with all transports/networks. In this case, one would be limited to a lower-performant network (say sockets) for all communication. Something to keep in mind.
Also, the support in OpenMPI appears broken right now, see this open bug and comments say it is low on priority: https://github.com/open-mpi/ompi/issues/3458
I prefer the approach of DASK providing a PMI implementation compatible with MPI libraries though it is a much larger effort compared to using DPM.
Thanks for the input @spotluri ! Some followup questions if I may
to provide implementation for the PMI interface use by a particular MPI library
Do know have a sense for what this looks like? Can you recommend a good reference here for people to check out?
provide the backed for exchanging control information that happens through the launcher/deamon processes
Dask is more than happy to move around control information. My hope is that this would be done once at startup and then MPI would take over for most communication. Is this the case? For context, Dask communications will have latencies of at least a millisecond, which, as I'm sure you're aware, can be a long time for tightly-coupled MPI computations.
MPI libraries use an interface called Process Management Interface (PMI) to interact with the job launcher
I want to verify here that ideally we should be able to start MPI from the existing dask-worker processes, not from starting new processes as would be done with SLURM/etc.. Not a hard requirement, but definitely a nice-to-have. Is this goal consistent with PMI?
Also, the support in OpenMPI appears broken right now, see this open bug and comments say it is low on priority: open-mpi/ompi#3458
Ah yes! I remember stumbling across this back when I was originally exploring this stuff ~2 years ago. So my memory of OpenMPI was incorrect. In which case it was Intel MPI where these APIs do work.
From what I remember, MVAPICH doesn't support these APIs either (I think you actually get an "We don't support this function!" message if you try and use them).
This is indeed problematic because you don't want to pin yourself to a non-free MPI.
I prefer the approach of DASK providing a PMI implementation compatible with MPI libraries though it is a much larger effort compared to using DPM.
These is very dangerous territory. You would be programming to implementation specific details that may or may not be portable across MPI implementations.
The "right" solution here is to make use of the APIs in the standard. Unfortunately, support for these APIs is sparse. Therefore, though it may not be fast, the best long-term solution is to complain (loudly) to the various implementations that this feature is important to us, and they are out of compliance with the MPI standard by not supporting these features.
I agree with @jrhemstad, here. It is unfortunate that the MPI standard is...well...not very standard. However, this is the only appropriate way forward. We need to stick to the MPI specification. ...and for MPI implementations that claim to be open source, perhaps we can suggest (or even go so far as providing) implementations of the missing API.
A set of slides from an MPI BoF at SC. Link from @jrhemstad
So based on outside conversation and further investigation, it seems like it is worth the effort of exploring using PMI or PMIx directly.
I only recently learned of this effort, but it looks to have wide support among the MPI implementations and contributors that matter. Process start-up is something that is outside of the MPI standard, so I believe that it is fine to explore options outside of the MPI_Comm_accept/connect
APIs (though we should still push on getting that fixed).
It may be worth having a conversation with the PMIx devs about our use case and how it maps to their design goals. @rhc54 is probably the guy to talk to.
I think you'll find that the PMIx Groups work has what you need: https://github.com/pmix/pmix-standard/pull/139
The implementation is already in PMIx master branch: https://github.com/pmix/pmix
The MPI Sessions WG is exploring it for what you have described - dynamic async construction of MPI communicators.
@rhc54 Thanks Ralph! We'll take a look.
For transparency, it appears that NVIDIA MPI folks have started conversations about this topic internally. We'll see what surfaces...
@mrocklin Out of curiosity, can you say more about this? Are they looking at using the PMIx Group capability, or some other approach?
Honestly I've stopped tracking the internal chatter, I'll ask someone there to summarize the situation and report up though.
Restating Dask's problem statement for clarity:
N
workers independent of MPI (i.e. no mpirun
)N
for whatever reasonM
ranks, where 1 <= M <= N
.N
is fixed for the duration of the MPI jobN
until the next MPI job.The fundamental issue with the above problem statement is that if we do not want to spawn new processes to do the MPI job, this requires calling MPI_Init
on the Dask worker, and then MPI_Finalize
once the MPI job is complete, i.e., calling MPI_Init
and MPI_Finalize
many times throughout the lifetime of the Dask worker.
However, this is expressly forbidden by the MPI Standard. Some have suggested we just "do it anyways" and exploit implementation specific behavior, but I think this is a non-starter.
I had lovely conversation with @rhc54 @jsquyres @hppritcha (and others I don't know the github handles for) where I introduced them to the above problem statement and we had a conversation about how we can solve it. For those of you who were in the call, please correct or clarify anything I've said that doesn't sound right.
In short, the result of the conversation is that MPI Sessions and https://github.com/mpi-forum/mpi-issues/issues/103 looks to be our best path forward for a long-term solution. Sessions came about in-part due to frustrations with the single init/finalize requirement. They are targeted for the MPI 4.0 standard, which is slated for early 2020. OpenMPI currently has an experimental prototype implementation of Sessions that we could begin experimenting with in the short-term.
I'm not going to try and give a functionally complete summary of MPI Sessions because I am not qualified to do so. However, this is my limited understanding:
An MPI session allows you to query the "resource manager" RM (think Dask scheduler here) for a list of "process sets" that can participate in an MPI job. For example, Dask workers [0, N)
can be a process set, workers [0, N/2)
and [N/2, N)
could be additional process sets, it's any arbitrary grouping of processes you want with a given identifier.
You use one of the available set of processes to initialize a Session and then create your MPI group/communicator and use that as you would any other MPI communicator* for your MPI job.
You can initialize/finalize a Session many times throughout the lifetime of a process, and MPI Job i
can use a different process set than MPI Job i + 1
. Thus, we've satisfied our problem statement in that we can use the existing Dask processes for our MPI job, and between MPI jobs, Dask is free to dynamically size the number of workers.
For OpenMPI, PMIx
is the layer below MPI that is used to maintain the "process sets" that are available to use to create a Session. The Dask scheduler would need to be modified to "speak" PMIx in order to setup the process sets that can be consumed by MPI Sessions. Fortunately, I've been told by @rhc54 that Python bindings from PMIx are already in the works.
Furthermore, Dask can also be modified (or an add-on) to do all of the work of setting up the communicator from the process set/Session, and that communicator can then just be handed off to whatever C/C++ library that is using MPI. In this way, no modification is necessary in the underlying C/C++ library using MPI**.
Finally, I believe the folks on the OpenMPI call that they were excited about having a solid use-case for Sessions and expressed interest in staying in close communication about our efforts here. It was my impression that there could be a fruitful opportunity to collaborate here.
*
Note that this precludes any library that is using MPI_COMM_WORLD
directly, but that's bad practice anyways and we should push against any library that does this to change the way their library is implemented to accept a communicator to use as part of their API.
* Similar to , the libraries API needs to be defined that it just accepts a communicator rather than relying on MPI_COMM_WORLD
.
@jrhemstad Thanks for the links on mpi-sessions. Those were useful! Like we already discussed this at several places, there are the following cases that we can consider how to put dask and mpi together. I don't see how mpi-sessions would help us in any of these cases. May be I'm missing something here?
Case1a: If we are going with 'dask-first' approach and further decide to reuse dask-worker processes for MPI tasks, we still need to use the traditional 'MPI_Init' anyways to make these processes become MPI-aware, right? If so, how would mpi-sessions help us in this case? Maybe you meant it'll be useful when we want to regroup those processes differently for cudf part and the cuML part?
Case1b: If we are going with 'dask-first' approach but dask spawning new MPI processes for the current task (eg: running a cuML-algo), we might as well get its communicator and go ahead, no?
Case2: If we are going with 'mpi-first' approach (as in Matt's blog on dask-mpi), we still have to deal with the complexities due to the 2 extra mpi ranks being launched. Even here, we could just create a communicator out of those 'actual' N-2 ranks and use them for the cuML work, right?
I think @hppritcha would be the best source for MPI Sessions questions.
Case1a: If we are going with 'dask-first' approach and further decide to reuse dask-worker processes for MPI tasks, we still need to use the traditional 'MPI_Init' anyways to make these processes become MPI-aware, right? If so, how would mpi-sessions help us in this case?
@teju85
Case 1a is what we are ideally targeting with Dask.
@rhc54 or @hppritcha correct me if I am wrong, but Sessions help us because we do not have to explicitly call MPI_Init
, instead when initializing a Session, it will handle initializing the MPI Library for us if it has not already been done.
In this way, we do not have the problem of calling MPI_Init
/MPI_Finalize
multiple times within the same process.
See slide 50+ here: https://raw.githubusercontent.com/wiki/mpiwg-sessions/sessions-issues/2016-12-12-webex/2016-12-12-webex.pptx
@jrhemstad So, IIRC, for Case1a, we want to have the ability to perform separate Init/Finalize calls done once for cudf-part and for the cuml-part. That's where you mention that mpi-sessions can be useful, am I right?
@rhc54 @hppritcha I noticed from this relatively recent presentation that the processes still need to be MPI-aware, before we could use session/group operations on them. Am I right in concluding so?
In other words, as per Case1a above, we are having processes which are not launched via a "parallel application" mechanism. Can we still use mpi-sessions on them and get these processes to communicate with each other?
I believe the answer is "yes", but I think @hppritcha and @dholmes-epcc-ed-ac-uk are the right people to address your questions. Unfortunately, the MPI Forum meeting is being held this week and I suspect they won't be able to answer right away.
Thank you @rhc54
I went ahead and setup a toy-project to simulate this "dask-first" approach. Here's the toy-repo: https://github.com/teju85/dask-first-with-mpi
If you run the Singleton MPI from this repo, it basically does the following:
MPI_Init
and some printf's followed by MPI_Finalize
.
However, all of the 8 dask workers get a rank of 0 and world-size of 1 (expected!)So, what should I be doing here to make these processes get appropriate rank and a world-size of 8? If these are mostly RTFM-worthy questions, please pardon and point me to the relevant docs.
Regards, Thejaswi
Case1a: If we are going with 'dask-first' approach and further decide to reuse dask-worker processes for MPI tasks, we still need to use the traditional 'MPI_Init' anyways to make these processes become MPI-aware, right? If so, how would mpi-sessions help us in this case?
The intent of MPI Sessions is to provide a different way to initialise MPI, which does not require calling MPI_INIT.
The sequence of calls is:
void main() {
// start MPI
MPI_SESSION_INIT(flags, info, errhandler, &session);
MPI_GROUP_FROM_SESSION_PSET(session, "name_of_pset", &group);
MPI_COMM_CREATE_FROM_GROUP(group, "unique_id_of_create_op", info, errhandler, &comm);
MPI_GROUP_FREE(group);
// do MPI stuff
doMPIstuff(comm);
// tidy up & close down MPI
MPI_COMM_DISCONNECT(comm);
MPI_SESSION_FINALIZE(session);
}
void doMPIstuff(MPI_Comm comm) {
MPI_ALLREDUCE(,..., comm);
}
This code could be put into any function (not just main, e.g. a library or a component or whatever).
This code could be executed many times by the same process (re-initialisation via sessions is permitted, unlike when using MPI_INIT).
The flags
and info
will probably be combined into info
before acceptance.
The flags
controls thread support level, which applies per-session (not to the whole MPI process as before).
The info
supplies other information to the MPI implementation (not used yet).
The errhandler
permits errors related to the session to be handled (rather than such errors being fatal and destroying all connected processes as before).
The name_of_pset
can be queried from the resource manager (using new MPI interfaces) or supplied directly (as in this example) if known a priori (e.g. as an input parameter somehow).
The unique_id_of_create_op
is a unique string that differentiates this operation from all others that it could otherwise be confused with. We are suggesting to use reverse dot notation, e.g. org.mpiforum.example.mpi_4.ex10_4.simple_case
refers to the operation in exercise 10.4, which will be included in MPI-4.0 authored by the MPI Forum.
The current reference implementation has been done (by @hjelmn) within Open MPI and relies internally on services provided by PMIx Groups. There are functional unit tests for the proposed new API. The example source codes proposed for addition to the MPI Standard compile and run against this reference implementation.
The current status within the MPI Forum is "promising". The MPI Sessions WG has a list of to-do items based on feedback from the 12 hour presentation to the MPI Forum this week. The intent is that this new interface will target MPI-4.0, which is currently slated for release during 2020. A solid use-case, fully implemented and demonstrated would definitely bolster the standardisation effort. The MPI Sessions WG is very interested in collaborating on this.
For hardy souls, the most recent version of the proposed MPI Standard can be found attached to the MPI Forum issue #103: https://github.com/mpi-forum/mpi-issues/issues/103
The direct link to the (unofficial draft, for comment only) PDF document is here: https://github.com/mpi-forum/mpi-issues/files/2876679/mpi-report-ticket103-2019-FEB-18.pdf
This a long document(!) but the important bits are section 10.4.1 (session initialisation and finalisation), section 6.3.2 (final function definition in that section), section 6.4.2 (final function definition in that section).
@mrocklin forgot to update this thread. Thanks to the discussions in this thread and several chats, I was able to successfully "convert" dask processes into MPI ranks in my repo here: https://github.com/teju85/dask-first-with-mpi.
@cjnolet is also trying to take a look at this for integrating this into RAPIDS workflow. Along with him, if you can take a look at this repo and provide any feedbacks especially from dask usage perspective, that'll help.
@mrocklin Does @teju85's work (above) resolve this issue?
I don't know enough about MPI or the implementation to be able to answer that unfortunately. I suspect that you would know more here than I would @kmpaul :)
@kmpaul the usage of MPI connect/accept in the toy repo linked by @teju85 looks correct (although suboptimal) to me.
I say suboptimal because it requires each non-root process to connect/accept with the root process, so scales linearly with the number of processes. Various other patterns could be used, e.g. recursive doubling, to achieve scaling proportional to the log of the number of processes - if that was important, e.g. for P=1024 processes, (P-1)=1023 steps, whereas log_2(P)=10 steps.
For completeness, the MPI Sessions proposal would hope to abstract much of this code away from end-users, but it requires someone do a bit of integration work for MPI+DASK. Ideally, the end-user would use MPI_Session_init (instead of MPI_Init in the current example code), then get the name of the process set that refers to all the DASK worker processes (which is where the integration work comes in), then use MPI_Group_from_pset to create an MPI_Group, then use MPI_Comm_from_group to create a communicator (with comm_size = number of dask workers, and appropriate comm-rank in range [0,P-1]). The necessary integration work would enable MPI to retrieve process set names from DASK for presentation to MPI users and/or to understand process set names given by MPI users that refer to sets of DASK processes. The proposal requires that process names be strings, formatted like URIs. The MPI_Comm_from_group function requires a unique string tag that differentiates between concurrent communicator creation operations (similar to the role of service name in MPI_Publish_name) - it is suggested to use reverse dot notation to achieve uniqueness.
So, it is hoped that, once MPI Sessions is accepted and the implementation/integration work is done, the cpp code could look more like this:
MPI_Session mySession;
MPI_Session_init(&mySession);
MPI_Group myGroup;
MPI_Group_from_pset(mySession, "dask://uniqueNameForSetOfWorkers", &myGroup);
MPI_Comm myComm;
char * myString = "org.dask.examples.usingSessions.Holmes01.uniqueNameForSetOfWorkers.init";
MPI_Comm_from_group(myGroup, myStringTag, ..., &myComm);
In case this is still of interest, the MPI Sessions proposal was accepted into the MPI Standard and appears in the MPI-4.0 version of the MPI Standard (downloadable from https://www.mpi-forum.org/docs/).
The API syntax and operational semantics are broadly the same as described in this thread.
@Wee-Free-Scot! That's very cool! Do you know if this has been implemented in MPICH or OpenMPI?
There is a reference implementation in Open MPI but it is not yet production code. I expect it will a matter of (a short amount of) time until all major/popular MPI libraries have implemented this functionality, in order that they can claim compliance with MPI-4.0
👍 Then we'll wait some more. Seems like this is the best solution.
Just dropping into this thread as end-user who'd be really interested in taking advantage of a feature like this.
MPICH supports MPI-4.0 from v4.0 and subset of MPI-4.0 is implemented in v5.0.x of OpenMPI, but is still in-progress.
Update for 2024 🤯:
Sessions is now fully supported in OpenMPI as of 5.0.0, and has had subsequent bugs squashed in later releases it seems. 5.0.3 is currently the latest release.
Sorry for otherwise being unhelpful here, but I'm very interesting in following the development of this capability since I have a bunch of use-cases that would be awesome to enable with this.
I would like to explore the possibility of Dask starting MPI. This is sort of the reverse behavior of what the dask-mpi package does today.
To clarify the situation I'm talking about, consider the situation where we are running Dask with some other system like Kubernetes, Yarn, or SLURM, and are doing Dask's normal dynamic computations. We scale up and down, load data from disk, do some preprocessing. Then, we want to run some MPI code on the data we have in memory in the Dask worker processes. We don't currently have an MPI world set up (our workers were not started with
mpirun
or anything) but would like to create one. Is this possible?To do this, Dask will have to go through whatever process
mpirun
/mpiexec
goes through to set up an MPI communicator. What is this process?Ideally it would be able to do this without launching new processes. Ideally the same processes currently running the Dask workers would initialize some MPI code, be told about each other, then run some MPI program, then shut down MPI and continue on with normal Dask work.
We'll need to become careful about what to do if a worker goes away during this process. We'll probably have to restart the MPI job, which will be fine. I think that I can handle that on the scheduling/resiliency side.
I think that the people who know the answer to these questions will be people who have experience not only in using MPI, but also in deploying it.