dask / community

For general discussion and community planning. Discussion issues welcome.
21 stars 3 forks source link

Runners vs Clusters #346

Open jacobtomlinson opened 1 year ago

jacobtomlinson commented 1 year ago

I want to start a bit of a discussion here about the difference between two Dask deployment paradigms that I will refer to as Clusters and Runners. My intention is to build more runners.

Clusters vs Runners

Here's a quick overview of these two paradigms.

Clusters

The most common way I see folks deploying Dask today is by having a single parent process that sets up a cluster by creating subprocesses via some means, not necessarily on the same system. All of the FooCluster implementations follow this pattern, the cluster class spawns a scheduler process, then spawns worker processes.

The identifying features of this paradigm are:

For example LocalCluster (via SpecCluster) creates an instance of Scheduler and n instances of Nanny. It assumes the hostname is 127.0.0.1 and passes that to the workers.

By contrast the HetznerCluster from dask_cloudprovider creates a VM for the scheduler, then it waits for the VM to be provisioned so that it can get the IP address of that VM. Then it continues to create the worker VMs and passes that IP address along.

Runners

In the runner paradigm, many generic processes are created at the same time and all run the same thing. Each process is then responsible for figuring out what it should do (start a scheduler? start a worker? run the client?) and if it isn't the scheduler it needs to figure out what the address of the scheduler is.

The identifying features of this paradigm are:

The most popular implementation of this pattern is dask-mpi. In this case, a script containing the client code is submitted using mpirun (often via some job scheduler like SLURM). N instances of the script are invoked on an existing compute cluster. The first thing the code does is call dask_mpi.initialize(). This function uses the MPI rank (a unique monotonic index assigned to each process) to decide whether it should run a scheduler, a worker or simply continue running this script and progress to the client code. The scheduler and worker processes call sys.exit(0) after they complete, so they never progress beyond the initialize function. MPI also has methods for communicating between processes, so the scheduler broadcasts its IP to all other processes, and the other processes wait to receive this before starting workers or connecting a client.

Community needs

Recently I've been identifying more deployment needs in the community that would be better served by having more implementations of the Runner paradigm. There are two use cases in particular.

SLURM (without MPI)

dask-mpi barely scratches the surface of MPI and by using MPI to bootstrap Dask it brings a lot of baggage along with it. We are using a sledgehammer to open a peanut.

Processes spawned by SLURM are also assigned a SLURM_ID environment variable which in most cases is identical to the MPI rank and could be used to infer a role. Clusters that use SLURM also usually have a shared filesystem so a file could be used to communicate the scheduler address between processes. This would allow the same functionality as dask-mpi but without needing to use MPI at all.

Databricks

When you launch a multi-node cluster on Databricks one node is created to run the Spark driver and other nodes to run the workers. I occasionally get asked if Dask can run on Databricks.

Databricks supports configuring init scripts which get run on all nodes during launch and are generally used for installing plugins or other dependencies. However, the init scripts are provided with two useful environment variables IS_DRIVER_NODE and DRIVER_NODE_IP. This gives us everything we need to start a scheduler on the driver node and workers that can connect to it on other nodes.

This would effectively allow us to side-load a Dask cluster onto a Databricks Spark cluster. When using Databricks Notebooks the cells are executed by the driver so they could just connect to the scheduler on localhost. This would give Databricks Spark users a low-effort way to try out Dask in their existing workflow.

API

Today the dask-mpi API uses a single function call to handle Dask boostrapping and ultimately subverting the execution of most processes.

from dask.distributed import Client
from dask_mpi import initialize

# Only one process will progress beyond this call
# The rest run the Dask components until completion and then exit here
initialize() 

client = Client()  # Scheduler info is set magically via `dask.config`

This can be a little confusing for users. Many new users fear all processes will run everything in the script.

It might be nice to use a more familiar API like a context manager.

from dask.distributed import Client
from dask_mpi import MPIRunner

# Only one process will progress beyond the __enter__ call
# The rest run the Dask components until completion and then exit here
with MPIRunner as runner:
    client = Client(runner)

This makes it very easy to replace with LocalCluster or other if you want to run this same script in many places.

However, this API is still a bit magic. It's not clear that only one process continues inside the context manager.

Perhaps another alternative is something more explicit, but this idea also adds boilerplate.

from dask.distributed import Client
from dask_mpi import MPIRunner

runner = MPIRunner()

if runner.role in ["scheduler", "worker"]:
    runner.run()
    sys.exit(0)

runner.wait_for_scheduler()
client = Client(runner)

This makes things readable and adds clarity, but also introduces surface area for user bugs.

Open questions

  1. Do we like the terms Cluster and Runner? I worry that cluster is overloaded already and also using it to differentiate between these paradigms may be confusing. Are there better words?
  2. The current dask-mpi API is a big magic, should we go for a different API approach?
  3. One of the things I love about Dask is the consistency between LocalCluster, SSHCluster, KubeCluster, etc. Should we try and keep that consistency with runners? Or should they have a notably different feel but similar consistency between them?
  4. Where should these things live? If I make a SLURM Runner should it live in dask-jobqueue? Should a new MPIRunner live in dask-mpi? Should base classes live in distributed? Should all of this go in a new package called dask-runners?
jacobtomlinson commented 1 year ago

For info I picked up some code I experimented with a couple of years ago in https://github.com/dask/distributed/pull/4710 and https://github.com/dask/dask-mpi/pull/69 and I'm playing with MPIRunner and SLURMRunner here https://github.com/jacobtomlinson/dask-hpc-runner. That repo is not the final resting place for this stuff though.

lgarrison commented 11 months ago

I definitely support the development of the Runner paradigm! The names Cluster and Runner seem fine to me, although if people thought those terms were overloaded, maybe something like Top-Down and Bottom-Up would work?

In a Slurm context, it would be nice to decouple the bootstrapping of the cluster from MPI, which is a heavyweight requirement, as you note. The dask-runners repo is a nice start, and I think jacobtomlinson/dask-runners#3 works as a proof-of-concept for Slurm but needs more polish. I agree the API design is not obvious here, but the context manager approach seems fine to me.

Regarding where things should live, I do like keeping the bottom-up and top-down approaches separate (e.g. keep SlurmRunner separate from dask-jobqueue). But if that means that all the runners live in the same repo, it makes dependency management more complicated—if MPIRunner and SlurmRunner are in the same repo, then mpi4py might be an incidental dependency for SlurmRunner. So this may need some more thought, too.