azavea / noaa-hydro-data

NOAA Phase 2 Hydrological Data Processing
11 stars 3 forks source link

Batch tasks using Dask in Argo #120

Closed jpolchlo closed 1 year ago

jpolchlo commented 1 year ago

Overview

This PR provides some infrastructure for running Dask jobs independent of Jupyter. This enables long-running jobs that would be cumbersome to remain logged into Jupyter for. It also opens up a workflow based on standard python scripts, rather than Jupyter notebooks. I've been using a standard form for my scripts so that the cluster can be configured via the Argo job submission interface:

import logging
import dask_gateway

logger = logging.getLogger("DaskWorkflow")
gw = dask_gateway.Gateway(auth="jupyterhub")

try:
    opts = gw.cluster_options()
    opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
    opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
    opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
    opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
    cluster = gw.new_cluster(opts)
    cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
    client = cluster.get_client()

    logger.warning(f"Client dashboard: {client.dashboard_link}")

    # Client code goes here
finally:
    gw.stop_cluster(client.cluster.name)

Closes #112

Checklist

Notes

This workflow will eventually be added to the cluster configs as a ClusterWorkflowTemplate, but that will be handled by azavea/kubernetes-deployment#34.

Testing Instructions

vlulla commented 1 year ago

This looks great! I have a minor observation to share: in my exploration of distributed dask (using SSHCluster) i learned that Client forks a process^client-fork-process. I have also learned that it is considered a best practice to have Client() initialized in __main__ block. I am completely unfamiliar with how try/finally works with python interpreter initialization to know if your setup completely sidesteps this issue. Anyways, I thought my finding was worthwhile to share and hence this comment.

Anyways, this looks great! I am going to emulate this in my argo workflows and seek your advice on any issues that i run into.

jpolchlo commented 1 year ago

Is the gist of your comment that I ought to modify the template as follows?

import logging
import dask_gateway

logger = logging.getLogger("DaskWorkflow")

def main():
    gw = dask_gateway.Gateway(auth="jupyterhub")

    try:
        opts = gw.cluster_options()
        opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
        opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
        opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
        opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
        cluster = gw.new_cluster(opts)
        cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
        client = cluster.get_client()

        logger.warning(f"Client dashboard: {client.dashboard_link}")

        # Client code goes here
    finally:
        gw.stop_cluster(client.cluster.name)

if __name__ == "__main'":
    main()

It's worth noting that Dask Distributed works differently to Dask Gateway, and it should not be relying on threads/processes in the same way. I've not encountered any difficulty starting a Client from the template as it was presented (which was not in a main block).

vlulla commented 1 year ago

Indeed, that is the gist of my comment. Additionally, I think that modifying it this way makes the script work correctly when we are trying to experiment in a non-argo environment.

By the way, there's a minor typo: it ought to be "__main__" instead of "__main'".

Thanks for considering my point!

jpolchlo commented 1 year ago

Oops! Typo. Thanks. I adjusted the template in the README and the base flow example.

jpolchlo commented 1 year ago

As an additional point, it should be noted that without more complex logic, such an example template won't be interchangeable between the cloud environment and a local Dask distributed environment, since they have different imports and setup. I think.

vlulla commented 1 year ago

Yes, point taken!

rajadain commented 1 year ago

This could be made a little more explicit like this:

import logging
import dask_gateway

logger = logging.getLogger("DaskWorkflow")

def run_on_cluster(fn):
    gw = dask_gateway.Gateway(auth="jupyterhub")

    try:
        opts = gw.cluster_options()
        opts.worker_memory = int(os.environ['DASK_OPTS__WORKER_MEMORY'])
        opts.worker_cores = int(os.environ['DASK_OPTS__WORKER_CORES'])
        opts.scheduler_memory = int(os.environ['DASK_OPTS__SCHEDULER_MEMORY'])
        opts.scheduler_cores = int(os.environ['DASK_OPTS__SCHEDULER_CORES'])
        cluster = gw.new_cluster(opts)
        cluster.scale(int(os.environ['DASK_OPTS__N_WORKERS']))
        client = cluster.get_client()

        logger.warning(f"Client dashboard: {client.dashboard_link}")

        fn()
    finally:
        gw.stop_cluster(client.cluster.name)

def client_code():
    # Client code goes here

def main():
    run_on_cluster(client_code)

if __name__ == "__main'":
    main()

Going to try to run the example on the cluster now.

jpolchlo commented 1 year ago

@rajadain I took your advice (a bit) and modularized the template a bit more. You add sort of two levels of indirection into your code, that I simplified a bit. Check the modified README. Was there a particular reason that you wanted to elect the client code function as a higher-order function call?

rajadain commented 1 year ago

Was there a particular reason that you wanted to elect the client code function as a higher-order function call?

Just for clarity, so the client code is free of distraction. Your solution works well!