dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
235 stars 142 forks source link

Should we merge Dask HPC Runners in here? #638

Open jacobtomlinson opened 6 months ago

jacobtomlinson commented 6 months ago

For a while I've been playing around with this prototype repo which implements Dask Runners for HPC systems. I'm motivated to reduce the fragmentation and confusion around tooling in the Dask HPC community, so perhaps this new code should live here.

In https://github.com/dask/community/issues/346 I wrote up the difference between Dask Clusters and Dask Runners. The TL;DR is that a Cluster creates the scheduler and worker tasks directly, for example dask_jobqueue.SLURMCluster submits jobs to SLURM for each worker. A Dask Runner is different because it is invoked from within an existing allocation and populates that job with Dask processes. This the same as how Dask MPI works.

SlurmRunner Example

If I write a Python script and call it with srun -n 6 python myscript.py the script will be invoked by Slurm 6 times in parallel on 6 different nodes/cores on the HPC. The Dask Runner class then uses the Slurm process ID environment variable to decide what role reach process should play and uses the shared filesystem to bootstrap communications with a scheduler file.

# myscript.py
from dask.distributed import Client
from dask_hpc_runner import SlurmRunner

# When entering the SlurmRunner context manager processes will decide if they should be 
# the client, schdeduler or a worker.
# Only process ID 1 executes the contents of the context manager.
# All other processes start the Dask components and then block here forever.
with SlurmRunner(scheduler_file="/path/to/shared/filesystem/scheduler-{job_id}.json") as runner:

    # The runner object contains the scheduler address info and can be used to construct a client.
    with Client(runner) as client:

        # Wait for all the workers to be ready before continuing.
        client.wait_for_workers(runner.n_workers)

        # Then we can submit some work to the Dask scheduler.
        assert client.submit(lambda x: x + 1, 10).result() == 11
        assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21

# When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes.

Should this live in dask-jobqueue?

I'm at the point of trying to decide where this code should live within the Dask ecosystem. So far I have implemented MPIRunner and SlurmRunner as a proof-of-concept. It would be very straight forward to write runners for other batch systems provided it is possible to detect the process ID/rank from the environment.

I can imagine users choosing between SLURMCluster and SlurmRunner depending on their use case and how they want to deploy Dask. There are pros/cons to each deployment model, for example the cluster can adaptively scale, but the runner only requires a single job submission which will guarantee better node locality. So perhaps it makes sense for SlurmRunner to live here in dask-jobqueue and we can use documentation to help users choose the right one for them? (We can make the name casing more consistent).

The MPIRunner and SlurmRunner share a common base class, so I'm not sure if that means MPIRunner should also live here, or whether we should accept some code duplication and put it in dask-mpi?

Alternatively my prototype repo could just move to dask-contrib and become a separate project?

Or we could roll all of dask-jobqueue, dask-mpi and the new dask-hpc-runners into a single dask-hpc package? Or pull everything into dask-jobqueue?

The Dask HPC tooling is currently very fragmented and I'm keen to make things better, not worse. But I'm very keen to hear opinions from folks like @guillaumeeb @lesteve @jrbourbeau @kmpaul @mrocklin on what we should do here.

guillaumeeb commented 6 months ago

Thanks @jacobtomlinson for sharing this and opening this discussion!

The Dask HPC tooling is currently very fragmented and I'm keen to make things better, not worse.

I totally agree with this statement. I would love to see a single dask-hpc package with all of this, but this would require a non negligible amount of work, especially in documentation content.

kmpaul commented 6 months ago

This is fantastic, @jacobtomlinson! Very cool stuff.

I agree with @guillaumeeb that it would be really nice to have a Dask HPC package with everything in one place. I would volunteer to help maintain that package, since Dask MPI would get rolled into it.

This reminds me that I have had a major PR in Dask MPI waiting for quite a while now. I should finish that and merge it in before moving DaskMPI into any new package.

jacobtomlinson commented 6 months ago

Awesome thanks @kmpaul. Your new implementation looks interesting, feel free to ping me again for review once the conflicts are worked out.

I think my plan will look like the following:

I've submitted an abstract to EuroSciPy to talk about Dask and HPC (and GPUs/UCX) quite broadly, but if it gets accepted that could make a nice deadline to work towards and announce these changes.