PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.98k stars 1.57k forks source link

Docs: Add a tutorial for how to use Prefect with a job scheduler (e.g. Slurm) on an HPC machine #10136

Open Andrew-S-Rosen opened 1 year ago

Andrew-S-Rosen commented 1 year ago

First check

Describe the issue

There are currently no examples for how to use Prefect with a job scheduling system (e.g. SLURM, PBS, MOAB) on an HPC machine. I think this is a pretty important omission because most academics and users of the top supercomputers might not be aware how they can use Prefect.

Describe the proposed change

Add a tutorial.

Additional context

There's mention of spinning up a Dask cluster but no representative example of how this is done in practice with a given job scheduling system.

alexisthual commented 1 year ago

I'm all in for a tutorial on how to use Prefect with a stack using submitit, hydra, and possibly dora!

Andrew-S-Rosen commented 1 year ago

I must admit that I have no clue what any of those 3 are 😅 I was thinking more of the recommended approach of using a prefect-dask.DaskTaskRunner instantiated with a dask-jobqueue.SLURMCluster, which would be passed to the task_runner kwarg in the @flow decorator.

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

JBlaschke commented 10 months ago

I would also be interested in such a tutorial. In particular a simple tutorial that takes a Prefect workflow, submits it as a Slurm job. And monitors its execution. I'm able to help all y'all, and can test it at NERSC.

I'm also interested in embarrassingly parallel tasks, and tasks that communicate using MPI across nodes.

Andrew-S-Rosen commented 10 months ago

@JBlaschke: Here is an example I wrote up for Perlmutter. Feel free to use this and do whatever you'd like with it. I think the challenge I ran into for practical use was that you can only apply a single task runner for the entire flow, meaning if some of the individual tasks have different compute requirements, you're mostly out of luck (unless I missed something). There is also the fact that the compute nodes need to be able to communicate with the Prefect server, which not all HPC machines can do if using Prefect Cloud, but that's not a problem at NERSC.

Note to future me: the content below was taken from my 0.2.0 release of quacc.

Some Utility Functions

from __future__ import annotations

from typing import Callable, TYPE_CHECKING

from dask_jobqueue import SLURMCluster

if TYPE_CHECKING:
    from dask_jobqueue.core import DaskJobqueueJob
    from prefect_dask.task_runners import DaskTaskRunner

def make_dask_runner(
    cluster_kwargs: dict,
    cluster_class: Callable = None,
    adapt_kwargs: dict[str, int | None] | None = None,
    client_kwargs: dict = None,
    temporary: bool = False,
) -> DaskTaskRunner:
    """
    Make a DaskTaskRunner for use with Prefect workflows.

    Parameters
    ----------
    cluster_kwargs
        Keyword arguments to pass to `cluster_class`.
    cluster_class
        The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
    adapt_kwargs
        Keyword arguments to pass to `cluster.adapt` of the form `{"minimum": int, "maximum": int}`.
        If `None`, no adaptive scaling will be done.
    client_kwargs
        Keyword arguments to pass to `dask.distributed.Client`.
    temporary
        Whether to use a temporary cluster. If `True`, the cluster will be
        terminated once the `Flow` is finished. If `False`, the cluster will
        run until the walltime is reached and can run multiple `Flow`s.

    Returns
    -------
    DaskTaskRunner
        A DaskTaskRunner object for use with Prefect workflows.
    """
    from dask_jobqueue import SLURMCluster
    from prefect_dask.task_runners import DaskTaskRunner

    if cluster_class is None:
        cluster_class = SLURMCluster

    # Make the one-time-use DaskTaskRunner
    if temporary:
        return DaskTaskRunner(
            cluster_class=cluster_class,
            cluster_kwargs=cluster_kwargs,
            adapt_kwargs=adapt_kwargs,
            client_kwargs=client_kwargs,
        )

    # Make the Dask cluster
    cluster = _make_dask_cluster(cluster_class, cluster_kwargs)

    # Set up adaptive scaling
    if adapt_kwargs and (adapt_kwargs["minimum"] or adapt_kwargs["maximum"]):
        cluster.adapt(minimum=adapt_kwargs["minimum"], maximum=adapt_kwargs["maximum"])

    # Return the DaskTaskRunner with the cluster address
    return DaskTaskRunner(address=cluster.scheduler_address)

def _make_dask_cluster(
    cluster_class: Callable = SLURMCluster, cluster_kwargs: dict, verbose: bool = True
) -> DaskJobqueueJob:
    """
    Make a Dask cluster for use with Prefect workflows.

    Parameters
    ----------
    cluster_class
        The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
    cluster_kwargs
        Keyword arguments to pass to `cluster_class`.
    verbose
        Whether to print the job script to stdout.
    """

    cluster = cluster_class(**cluster_kwargs)
    if verbose:
        print(
            f"Workers are submitted with the following job script:\n{cluster.job_script()}"
        )
        print(f"Scheduler is running at {cluster.scheduler.address}")
        print(f"Dashboard is located at {cluster.dashboard_link}")

    return cluster

Example Usage

n_slurm_jobs = 1 # Number of Slurm jobs to launch in parallel.
n_nodes_per_calc = 1 # Number of nodes to reserve for each Slurm job.
n_cores_per_node = 48 # Number of CPU cores per node.
mem_per_node = "64 GB" # Total memory per node.

cluster_kwargs = {
    # Dask worker options
    "n_workers": n_slurm_jobs,
    "cores": n_cores_per_node,
    "memory": mem_per_node,
    # SLURM options
    "shebang": "#!/bin/bash",
    "account": "AccountName",
    "walltime": "00:10:00",
    "job_mem": "0",
    "job_script_prologue": [
        "source ~/.bashrc",
        "conda activate MyEnv",
    ],
    "job_directives_skip": ["-n", "--cpus-per-task"],
    "job_extra_directives": [f"-N {n_nodes_per_calc}", "-q debug", "-C cpu"],
    "python": "python",
}

runner = make_dask_runner(cluster_kwargs, temporary=True)

@flow(task_runner=runner)
def workflow(*args, **kwargs):
    ...

When the workflow is run from the login node, it will be submitted to the job scheduling system (Slurm by default), and the results will be sent back to Prefect Cloud once completed. Refer to the Dask-Jobqueue Documentation for the available cluster_kwargs that can be defined and how they relate to a typical job script.

To asynchronously spawn a Slurm job that continually pulls in work for the duration of its walltime (rather than starting and terminating over the lifetime of the associated Flow), you can instead use the make_dask_runner command without a temporary keyword argument:

runner = make_dask_runner(cluster_kwargs)

Additionally, you can have the generated Dask cluster adaptively scale based on the amount of work available by setting adapt_kwargs as follows:

runner = make_dask_runner(cluster_kwargs, adapt_kwargs={"minimum": 1, "maximum": 5})

This will ensure that at least one Slurm job is always running, but the number of jobs will scale up to 5 if there is enough work available.

JBlaschke commented 10 months ago

@Andrew-S-Rosen thank you -- I'll dust this off and condense it into a tutorial for our docs.

Andrew-S-Rosen commented 5 months ago

Here is a worked example for those curious. It is meant to work on the Perlmutter machine at NERSC, but the insights are largely machine-agnostic.

Preliminary Steps

On the login node:

pip install prefect[dask] dask-jobqueue
prefect cloud login

Basic Example

Prefect has a Dask and Ray backend, but only Dask (via dask-jobqueue) interfaces cleanly with Slurm. We'll use dask-jobqueue as the backend for simplicity.

We start by defining the SLURMCluster() object in dask-jobqueue. Namely, we're outlining what goes in the Slurm submission script. This is the part that you'll need to adjust for your machine.

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

cluster_kwargs = {
    "cores": 128,  
    "memory": "64 GB",
    "shebang": "#!/bin/bash",
    "account": "MyAccountName",
    "walltime": "00:10:00",
    "job_mem": "0",
    "job_script_prologue": ["source ~/.bashrc"],
    "job_directives_skip": ["-n", "--cpus-per-task"],  
    "job_extra_directives": ["-q debug", "-C cpu"],  
}

cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())

Now we define how many Slurm jobs we want with those specs and instantiate the Dask cluster. This will immediately submit a job to the queue even though we don't have any compute tasks to run just yet.

slurm_jobs = 1
cluster.scale(jobs=slurm_jobs)
client = Client(cluster)

Now we'll define our Prefect workflow (@flow) and tell it to run the individual @tasks via our active Dask cluster. This can be done using the DaskTaskRunner in Prefect, which allows you to pass an address for the Dask cluster.

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def workflow(a: float, b: float) -> float:
    output1 = add.submit(a, b)
    output2 = mult.submit(output1, b)
    return output2

@task
def add(a: float, b: float) -> float:
    return a + b

@task
def mult(a: float, b: float) -> float:
    return a * b

Now we instantiate and execute the workflow. The progress can be traced in Prefect Cloud.

output = workflow(1, 2)
print(output.result())

Since the Dask cluster remains alive until the walltime or it is killed, we can run another workflow if we want.

Temporary Dask Cluster

Some users may prefer to spin up a Dask cluster (i.e. Slurm job) for each indiviual @flow. This is also possible in Prefect. (Note that the example below will complain about an open port if you have already instantiated a Dask cluster above.)

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(cluster_class=SLURMCluster, cluster_kwargs=cluster_kwargs))
def workflow(a: float, b: float) -> float:
    output1 = add.submit(a, b)
    output2 = mult.submit(output1, b)
    return output2

@task
def add(a: float, b: float) -> float:
    return a + b

@task
def mult(a: float, b: float) -> float:
    return a * b

workflow(1, 2).result()

At this point, since the @task is finished, the Slurm job is no longer running.

Limitations

Prefect Cloud is Fine at NERSC But Not Everywhere

Need a network connection from the compute node.

Killed Slurm Jobs Aren't Reflected in the UI

If you scancel a Slurm job, it's possible that the @flow may be stuck in a running state in the UI.

Pilot Job Behavior is Limited

The main limitation of Prefect in this kind of setup is that it inherits the limitations of dask-jobqueue. Most notably, you can't distribute @tasks over multiple nodes on a single Slurm allocation. For instance, the following is not possible:

Some details:

Concurrent @flows is Not Yet Supported

If you want to run multiple concurrent @flows without blocking in a single Python process, you can't (yet). You have to submit each @flow sequentially or in separate Python processes. This is kind of annoying, but there is an open issue about it.

Details:

pgierz commented 2 weeks ago

Hello,

I'm trying to use this example to get tasks to execute on the actual compute node and running into difficulties. Here is what I have so far:

#!/usr/bin/env python3
"""
Submits work to SLURM via dask_jobqueue.

In this example, we use Dask in conjunction with Prefect to submit a job to SLURM,
and retrieve information about the SLURM node and job specifications. No compute-heavy
work is done here.
"""

import os
import socket

from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from prefect import flow, task
from prefect.logging import get_run_logger
from prefect_dask import DaskTaskRunner

CLUSTER_KWARGS = {
    "account": "computing.computing",
    "walltime": "00:10:00",
    "memory": "8Gb",
    "cores": 1,
    "n_workers": 1,
}

CLUSTER = SLURMCluster(**CLUSTER_KWARGS)
SLURM_JOBS = 1
CLUSTER.scale(jobs=SLURM_JOBS)
CLIENT = Client(CLUSTER)

@task
def show_job_script():
    logger = get_run_logger()
    logger.info(CLUSTER.job_script())

@task
def show_slurm_info():
    logger = get_run_logger()
    logger.info("Finding slurm environment variables...")
    for var_name, var_value in os.environ.items():
        if var_name.startswith("SLURM"):
            logger.info(f"{var_name}={var_value}")

@task
def show_hostname():
    logger = get_run_logger()
    hostname = socket.getfqdn()
    logger.info(f"Running on {hostname}")

@flow(task_runner=DaskTaskRunner(address=CLIENT.scheduler.address))
def display_slurm_info():
    show_job_script()
    show_slurm_info()
    show_hostname()

if __name__ == "__main__":
    display_slurm_info()

Here is the corresponding output:

pgierz in 🌐 albedo0 in prefect-examples on  main [?] via 🐍 v3.10.6 (python-3.10.6) took 43s
❯ python prefect-slurm-info.py
09:13:57.780 | INFO    | prefect.engine - Created flow run 'muscular-kingfisher' for flow 'display-slurm-info'
09:13:57.783 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/fe8cd180-eb51-4c4a-93d5-c815df87e28b
09:13:57.834 | INFO    | prefect.task_runner.dask - Connecting to existing Dask cluster SLURMCluster(d4d0253f, 'tcp://10.100.1.1:38383', workers=0, threads=0, memory=0 B)
09:13:57.877 | INFO    | Task run 'show_job_script-355' - Created task run 'show_job_script-355' for task 'show_job_script'
09:13:57.898 | INFO    | Task run 'show_job_script-355' - #!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -A computing.computing
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=8G
#SBATCH -t 00:10:00

/albedo/home/pgierz/Code/gitlab.awi.de/hpc/tutorials/prefect-examples/.direnv/python-3.10.6/bin/python -m distributed.cli.dask_worker tcp://10.100.1.1:38383 --name dummy-name --nthreads 1 --memory-limit 7.45GiB --nanny --death-timeout 60

09:13:57.901 | INFO    | Task run 'show_job_script-355' - Finished in state Completed()
09:13:57.918 | INFO    | Task run 'show_slurm_info-73d' - Created task run 'show_slurm_info-73d' for task 'show_slurm_info'
09:13:57.922 | INFO    | Task run 'show_slurm_info-73d' - Finding slurm environment variables...
09:13:57.923 | INFO    | Task run 'show_slurm_info-73d' - SLURM_ACCOUNT=computing.computing
09:13:57.927 | INFO    | Task run 'show_slurm_info-73d' - Finished in state Completed()
09:13:57.944 | INFO    | Task run 'show_hostname-cb1' - Created task run 'show_hostname-cb1' for task 'show_hostname'
09:13:57.950 | INFO    | Task run 'show_hostname-cb1' - Running on albedo0
09:13:57.952 | INFO    | Task run 'show_hostname-cb1' - Finished in state Completed()
09:13:57.998 | INFO    | Flow run 'muscular-kingfisher' - Finished in state Completed()

From the output, it is evident that I am still getting information from a login node (these are called albedo0 and albedo1), not one of our compute nodes (these are always called prod-XXX). From a second terminal running squeue --me --iterate 1 I can see that a Dask-worker job is started and then stopped, so the connection between Prefect and Dask seems to work fine, but the tasks are still not running in the correct place.

What am I doing wrong?

JBlaschke commented 2 weeks ago

Hi @pgierz

Does squeue show your job as running? Did slurm throw an error (most likely in a log somewhere)

Tbh I don't really know how Dask interacts with Slurm behind the scenes -- at NERSC we actually have make sure Dask is configured correctly (eg. https://gitlab.com/NERSC/nersc-notebooks/-/tree/main/perlmutter/dask) in order to interact with the network correctly.

Is there a reason you're using the Dask Slurm cluster? At NERSC we're working on a way to interact with Slurm directly, which might side-step issues like these.

pgierz commented 2 weeks ago

Hi @JBlaschke,

I could see the job running, and the logs don't show anything interesting, just our standard SLURM prolog output.

At NERSC we're working on a way to interact with Slurm directly, which might side-step issues like these.

I would be very interested in something like this, and am also more than happy to contribute, if that is something you would need more manpower for.

pgierz commented 2 weeks ago

If anyone is interested in looking through what we are doing, the examples are here: https://gitlab.awi.de/hpc/tutorials/prefect-examples