MetOffice / dagrunner

⛔[EXPERIMENTAL] Directed acyclic graph (DAG) runner and tools
BSD 3-Clause "New" or "Revised" License
1 stars 0 forks source link

Multi-node execution #51

Open cpelley opened 1 month ago

cpelley commented 1 month ago

Options available

Manual running

module load scitools

# start scheduler
dask-scheduler --no-dashboard --protocol tcp &

# start workers
dask worker <scheduler-ip>:<port> --nworkers 4 --nthreads 1 &

# start client
python dask_computation.py --address <scheduler-ip>:<port>

using dask-mpi (mpirun/mpiexec)

Not yet tested.

pip install dask-mpi
mpiexec -n 4 python dask_computation.py --mpi

Using dask-jobqueue

pip install dask-jobqueue
total_memory = 60  # Total memory per worker job/node in GB
num_workers = 8  # Number of workers per worker job/node
num_nodes = 2
walltime = '00:10:00'  # Maximum runtime of the workers
# set cores and processes to be the same for single-threaded
cluster = PBSCluster(
    protocol="tcp",  # unincrypted
    cores=num_workers,  # Number of cores per worker == processes by default
    processes=num_workers,  # Number of processes per worker job/node
    memory=f"{total_memory} GB",  # Amount of memory total per worker job i.e. node
    walltime=walltime,  # Maximum runtime of the worker job
)
cluster.scale(jobs=num_nodes)  # Scale the number of worker jobs to the number of nodes
    # Parameters
    # ----------
    # n : int
    #    Target number of workers
    # jobs : int
    #    Target number of jobs
    # memory : str
    #    Target amount of memory
    # cores : int
    #    Target number of cores

or

cluster.scale(<number of workers>)

or

cluster.adapt(minimum=1, maximum=10)  # Automatically scale the number of workers based on the workload
    # Parameters
    # ----------
    # minimum : int
    #    Minimum number of workers to keep around for the cluster
    # maximum : int
    #    Maximum number of workers to keep around for the cluster
    # minimum_memory : str
    #    Minimum amount of memory for the cluster
    # maximum_memory : str
    #    Maximum amount of memory for the cluster
    # minimum_jobs : int
    #    Minimum number of jobs
    # maximum_jobs : int
    #    Maximum number of jobs

Testing these approaches with a script

https://gist.github.com/cpelley/ff537e9dd5fb97ee681fa7207575330b

cpelley commented 1 month ago

Environment for testing dask/ray scheduling on multiple nodes.

mamba create -c conda-forge networkx improver ray dask -n multi_node_exe