dask / dask-mpi

Deploy Dask using MPI4Py
BSD 3-Clause "New" or "Revised" License
52 stars 29 forks source link

Launching dask-mpi clusters #44

Open rcthomas opened 4 years ago

rcthomas commented 4 years ago

Is this supported? We have the ability (through Slurm) to srun ... dask-mpi... which will launch the cluster on a queue set aside for interactive work. From the command line it's a single command. Other facilities may not have this capability (it may require a batch script).

We're willing to take a look into how to make this work if it's needed. cc @shreddd

mrocklin commented 4 years ago

Moving this to dask/dask-mpi .

This isn't currently supported, but it could be. In principle we need to create a class that meets some of the interface in https://github.com/dask/distributed/blob/master/distributed/deploy/cluster.py

If you wanted to get a start on this, I would recommend subclassing from Cluster, and implementing _start and _close. I suspect that that will get you most of the way there.

mrocklin commented 4 years ago

I think also that @andersy005 and @kmpaul may have had something like this in the past. I'm not seeing it now though, so I'm curiuos if maybe I removed it when we redid things for 2.0. (although the old version probably wouldn't have met the current-day constraints of the dask.distributed.Cluster interface)

kmpaul commented 4 years ago

I feel like some context was lost in the transfer of this issue to dask-mpi...? Not sure what exactly is being asked.

@rcthomas I hate to ask you to repeat yourself, but can you describe what kind of functionality (exactly) you are asking about?

kmpaul commented 4 years ago

Is this related to #9 in any way?

mrocklin commented 4 years ago

I think that the ask here is for a Python class that follows the Cluster interface so that it can be integrated into the JupyterLab "new cluster" button, as is depicted here: https://youtu.be/EX_voquHdk0

kmpaul commented 4 years ago

Ah! Yes. This would be very cool. ...Let's talk about what this would look like, then, because I'm actually wondering if this is more of a fit for Dask-Jobqueue.

rcthomas commented 4 years ago

Thanks for moving this to the right place. We're looking to enable both dask-jobqueue and dask-mpi style interactive HPC. We have a configuration that allows interactive use via either salloc or srun. It looks like dask-jobqueue sets up a job and submits it, here we'd want to do the same but there's no batch script involved. I'm sure this can be accommodated, it's just a question of where...

kmpaul commented 4 years ago

@rcthomas Is there a difference between the two approaches, other than that one uses a batch script and the other doesn't? In other words, would a user notice a difference if one of these approaches or the other was implemented behind the "new cluster" button?

rcthomas commented 4 years ago

That is one difference and it is not clear to me that it's something that needs to be included or whether it can be handled via abstraction. Another difference is vanilla dask-jobqueue has a scheduler running somewhere already and workers are added or come and go. With vanilla dask-mpi the scheduler and workers all start up at once. This latter difference may not matter but I'm not that familiar with the dask-labextension to know what matters.

mrocklin commented 4 years ago

A minimal implementation will probably look like this ...

class MPICluster(dask.distributed.Cluster):
    async def _start(self):
        self.proc = subprocess.popen("dask-mpi", ...)
        await scheduler_has_started_up()

    async def _close(self):
        self.proc.close()

And then you'll add a config file like the following:

labextension:
  factory:
    module: 'dask_mpi'
    class: 'MPICluster'
    args: []
    kwargs: {}

I'm sure that it will be more complex than that, but in theory that should work.

rcthomas commented 4 years ago

I've been studying the code a bit and I have a few questions and comments. To make things a bit more concrete I'm looking to encapsulate something like the following. Many examples will be less complicated but we want to make a more user friendly way to accomplish something like the following real (working!) example:

salloc \
    -N 40 \
    -n 1280 \
    -c 2 \
    -t 60 \
    -C haswell \
    -q interactive \
    -A m3384 \
    --image=registry.services.nersc.gov/rthomas/nersc-dask-example:v0.1.0 \
    srun -u shifter \
        /opt/conda/bin/python -u \
        /opt/conda/bin/dask-mpi \
            --scheduler-file=scheduler.json \
            --dashboard-address=0 \
            --nthreads=1 \
            --memory-limit=0 \
            --no-nanny \
            --local-directory=/tmp

So @kmpaul if dask-jobqueue can be made to do that then OK (there is just no concept of a batch script here).

The next thing I wondered was whether @mrocklin you really meant to say SpecCluster as the base class and not Cluster?

Taking a step back though all I want to do is subprocess something like that command line that starts a cluster, but it could be started with straight-up srun if say, Jupyter is running on a compute node already. That (starting a cluster via subprocess) is probably happening in quite a few different Cluster implementations but I wonder if that ought to just be a base class itself? My plan was to just make something that does that first, and then move on to getting the MPI and job launcher stuff right.

mrocklin commented 4 years ago

I meant Cluster. SpecCluster is based around running many individual worker objects, which you won't have. You have one big job, rather than a configuration of many jobs.

kmpaul commented 4 years ago

@rcthomas Yes. I think you are correct that this doesn't fit into the Dask-Jobqueue paradigm. This cluster object should probably be defined in Dask-MPI using the subprocess algorithm like the one @mrocklin suggests.

kmpaul commented 4 years ago

@rcthomas Ok. A question for you about what you actually need.

The command that you quoted above. Is this what you want to run when you click on the Dask Labextension button?

salloc \
    -N 40 \
    -n 1280 \
    -c 2 \
    -t 60 \
    -C haswell \
    -q interactive \
    -A m3384 \
    --image=registry.services.nersc.gov/rthomas/nersc-dask-example:v0.1.0 \
    srun -u shifter \
        /opt/conda/bin/python -u \
        /opt/conda/bin/dask-mpi \
            --scheduler-file=scheduler.json \
            --dashboard-address=0 \
            --nthreads=1 \
            --memory-limit=0 \
            --no-nanny \
            --local-directory=/tmp

Or is this what you want to run when you click the Dask Labextension button?

    srun -u shifter \
        /opt/conda/bin/python -u \
        /opt/conda/bin/dask-mpi \
            --scheduler-file=scheduler.json \
            --dashboard-address=0 \
            --nthreads=1 \
            --memory-limit=0 \
            --no-nanny \
            --local-directory=/tmp

If you want to run the former command, then it falls within the purview of Dask-Jobqueue. And if you really need to avoid creating a batch script and submitting it via SLURM (i.e., you really want to run the single command without a batch script), then I am sure a modification to Dask-Jobqueue's SLURMCluster can be made to accommodate this.

However, if you want to run the latter command, then it falls within the purview of Dask-MPI, and all we would need to do is create the MPICluster class as @mrocklin recommends.

What makes the most sense to you?

rcthomas commented 4 years ago

@kmpaul I am so sorry it's taken me 2 months to come back to this.

You've got a point. Backing up just a bit we salloc here because -q interactive doesn't accept submission via sbatch. I'm not sure how typical this is for everyone using Slurm. If it's OK, then let's go for putting it into Dask-Jobqueue.

I'll note that in principle the 2 versions of the command (salloc... srun... and srun...) will do the same thing from our login nodes (ok a diff is you have to put --image on the srun in the second case). But the Shifter Slurm integration (as you get with salloc) ensures the images are in place before the command runs.

I'm in the same room as @lesteve at the Dask Developer workshop and so we'll talk about this tomorrow.

rcthomas commented 4 years ago

We took a look at trying to adapt dask-jobqueue for salloc and it won't work. We're able to submit the job but dask-jobqueue looks for the job ID when the submit command returns. Of course here it doesn't ever return till the job times out. I'm looking at options, which now look like the original course I wanted to follow or an internal conversation about queue policy here.

kmpaul commented 4 years ago

@rcthomas No worries about getting busing with other things! I completely understand.

The fact that salloc does not return immediately is definitely a complication, but I think there's a way of making it work. It's just a little more complicated.

In standard Slurm Dask-Jobqueue, the sbatch command is run with the Dask-Jobqueue's Job._call(...) function. The Job._call function uses subprocess.Popen to run the sbatch command, which calls process.communicate() to get the stdout and stderr of the process. My guess is that this is where everything hangs when you run salloc, because salloc doesn't complete!

So, instead, I tried to see if you could get the Slurm job ID from stderr before the salloc process completes. I was able to do it, and here's the sample script that demonstrates the process in Python (running on NCAR's Casper, which uses Slurm):

from subprocess import Popen, PIPE
from shlex import split

cmd = "salloc --account=NIOW0001 --ntasks=1 --time=00:02:00 --partition=dav sleep 10"

print(f'Running command: "{cmd}"')
p = Popen(split(cmd), stdout=PIPE, stderr=PIPE)

print('Looping until first output to stderr (or command completes)...')
while True:
    err = p.stderr.readline().decode().strip()
    if err or p.poll() is not None:
        break

if err:
    print(f'First output to stderr: "{err}"')
else:
    print('No output found on stderr.')

print('Waiting until command finishes...')
out, err = p.communicate()
print('Command finished.')
print()
print('STDOUT:')
print(out.decode())
print()
print('STDERR:')
print(err.decode())

On NCAR's Casper system, the output I get from this script looks like this:

Running command: "salloc --account=NIOW0001 --ntasks=1 --time=00:02:00 --partition=dav sleep 10"
Looping until first output to stderr (or command completes)...
First output to stderr: "salloc: Granted job allocation 4375428"
Waiting until command finishes...
Command finished.

STDOUT:

STDERR:
salloc: Waiting for resource configuration
salloc: Nodes casper21 are ready for job
salloc: Relinquishing job allocation 4375428

Would something like this work? You'd have to create a new Dask-Jobqueue class that held on to the subprocess.Popen object, instead of discarding it like the current Job._call() method does. But it seems to me that something like the above would let you get the Slurm Job ID by processing the 1st line returned on stderr from the salloc command. The, most of the rest of the Dask-Jobqueue code should work... ?

kmpaul commented 4 years ago

@rcthomas That said, I still think that getting the srun ... dask-mpi ... command to work with something like an MPICluster object is a great idea, too. I think both options are worth doing. Unfortunately, I've got some timely issues taking up my free cycles right now, and I don't think I could look at this until mid-April.

jshleap commented 4 years ago

Hello all! any advances in this "issue"?

kmpaul commented 4 years ago

No progress yet. Is this a priority for you? I can try to accelerate this, but many things have come up over the last few months that have gotten in the way.

rcthomas commented 4 years ago

We're heading in a different direction from salloc in the long run FWIW

kmpaul commented 4 years ago

@rcthomas Because of the issues above? Or because of another reason?

jshleap commented 4 years ago

@kmpaul I guess I can use mprun in the meantime, but srun is the recommended command for SLURM. @rcthomas you mean away from running srun in salloc? If so, why? would that be also for sbatch?

rcthomas commented 4 years ago

@kmpaul the queue we're putting these clusters into isn't really the right place for them. We've got another plan that's more in line with vanilla dask-jobqueue. Some kind of MPICluster object would still be something we'd want to work on.

@jshleap yes, away from the salloc-srun setup (see above) and towards just sbatch