dask / dask-mpi

Deploy Dask using MPI4Py
BSD 3-Clause "New" or "Revised" License
52 stars 29 forks source link

MPICluster #9

Open kmpaul opened 5 years ago

kmpaul commented 5 years ago

After the first implementation of the initialize function, making it possible to launch a Dask cluster from within the client script by using mpi4py, it seems like the natural next step is to implement an MPICluster object so that the canonical client script operational model of:

cluster = MPICluster(...args...)
client = Client(cluster)

will work the same way that calling initialize works.

kmpaul commented 5 years ago

Recording some more thoughts on this:

I think that the MPICluster.__init__ method could effectively perform what the initialize function already performs. In fact, as a first pass, you could just have MPICluster.__init__ call initialize. Naively, I would like the send_close_signal function that is currently being registered with atexit to be moved to a MPICluster.close method. If we expand the initialize function out, this class might look like:

import sys
import dask
from mpi4py import MPI
COMM = MPI.COMM_WORLD
RANK = COMM.Get_rank()

SCHEDULER_RANK = 0
CLIENT_RANK = 1

class MPICluster(object):
    def __init__(self, ...):
        if RANK == SCHEDULER_RANK:
            scheduler = create_scheduler(...)
            addr = scheduler.address
        else:
            addr = None

        self.scheduler_address = COMM.bcast(addr)
        dask.config.set(scheduler_address=self.scheduler_address)
        COMM.Barrier()

        if rank == SCHEDULER_RANK:
            run_scheduler(scheduler)
            sys.exit()
        elif rank == CLIENT_RANK:
            pass
        else:
            create_and_run_worker(...)
            sys.exit()

    def close(self):
        send_close_signal()

but there is a problem with this: the initialize method (the __init__ method above) works by stopping the Scheduler and Worker ranks with a sys.exit() call after the close signal is sent to the Scheduler (i.e., after the Scheduler and Worker IOLoops have closed). Without the sys.exit() calls on the Scheduler and Worker ranks, those ranks continue from their last point of execution (in an MPI environment). And there is no clever way (that I can think of, at least) to "fast forward" these processes to the place in the code where the Client rank calls MPICluster.close().

In summary, I cannot think of a way of encapsulating the cluster start/stop procedure on the Scheduler and Worker ranks in an object-oriented way. The closest way I can think of is, frankly, a "cheat". That way is to have a dask_mpi.cluster module that calls initialize at the module level (which, of course, is not ideal because it either requires something like a dask_mpi.config object to set the initialize arguments before the dask_mpi.cluster module is imported, or it requires fixing the initialize arguments for the user). Everything else in this module would, therefore, only be executed on the Client rank, which would include the MPICluster class definition. Something like this:

from dask_mpi.core import initialize
initialize(...)

class MPICluster(object):
    def __init__(self):
        self.scheduler_address = dask.config.get('scheduler_address')

Hence, the MPICluster object would exist only as a holder for the scheduler_address attribute (which, in the initialize-function mode of operation is done by dask.config). This is fake object-oriented programming, and the "clean coder" in me wants to say, if you don't need an object, why create it?

Maybe someone out there has a better idea...

mrocklin commented 5 years ago

Is there a particular motivation behind having an MPICluster class? The initialize method seems sufficient to me.

kmpaul commented 5 years ago

The only reason I wanted to consider this option is that I like the pattern:

from dask_something import SomethingCluster
cluster = SomethingCluster()

from distributed import Client
client = Client(cluster)

...with the Client initializing from the Cluster object. I realize that the dask_mpi.initialize method is very similary...but it's not OO. And I like OO.

I'll leave this issue open, just in case some development makes this possible, but I agree that an MPICluster class is not actually necessary.

guillaumeeb commented 5 years ago

As I am probably still not pythonic enough, what is the problem with the solution you propose above @kmpaul ? using sys.exit() in init() would not work?

kmpaul commented 5 years ago

Hey, @guillaumeeb!

Yes. Using sys.exit() in __init__ would not have the operation that you would expect.

The Cluster objects allow you to start and stop a cluster (i.e., the scheduler and workers) from the Client process. And, strictly speaking, you could start a dask cluster, stop it, and then restart it. With the sys.exit() in the MPICluster.__init__ method, the scheduler and worker processes will be killed...which would cause an MPIAbort in most MPI implementations. That would then kill your Client process, meaning that you could not restart your cluster.

If you removed the sys.exit() calls from __init__, then once the scheduler and workers were closed, those processes would "pick up where they left off." Namely, they would return from the __init__ call and then run your Client code again...except in serial on multiple ranks.

guillaumeeb commented 5 years ago

And, strictly speaking, you could start a dask cluster, stop it, and then restart it.

But do you really want to do this in a MPI job? Couldn't this be a documented limitation? Once your cluster is closed, this is the end of the MPI run?

But maybe you wanted to mix Dask and other MPI workload as in http://blog.dask.org/2019/01/31/dask-mpi-experiment?

mrocklin commented 5 years ago

Short term I like the current initialize approach. I think that it does what we want.

Long term I think it would be interesting to dynamically launch and destroy MPI jobs from within Python, perhaps by calling something like mpirun directly. I'm not sure how valuable this would be in practice (you all would know more), but it could be fun to experiment with.

kmpaul commented 5 years ago

@guillaumeeb Yes. I agree with you that for an MPI run, you probably only want to shut down the cluster at the end of your job in most (all?) cases. However, I think that if you do this, then the only part of the MPICluster that you need is the __init__ method (and the close method that gets called when the object is destroyed). And that is exactly what the initialize function already does, so there is no need for a class when just one function does the job.

@mrocklin I think that what you are describing could be accomplished by using Dask-jobqueue from within a Dask-MPI job. Although that would only work on a system with a job scheduler.

So, that does suggest some other configurable options, though. What if you only want to use a fraction of the available MPI ranks? And then use more ranks later? On a system with a job scheduler, this could be bad because the scheduled MPI job would have fixed/reserved resources for the duration of the job, and only a fraction would be used (unless you were doing something like the dask-mpi-experiment). However, on a system without a job scheduler, you could elastically scale your resources with additional mpirun usages. As you say, Matt, I'm not sure how valuable this would be in practice, but it would be interesting to experiment with.

guillaumeeb commented 5 years ago

OK, Thank you both for your answers!

kmpaul commented 5 years ago

You are welcome!