equinor / ert

ERT - Ensemble based Reservoir Tool - is designed for running ensembles of dynamical models such as reservoir models, in order to do sensitivity analysis and data assimilation. ERT supports data assimilation using the Ensemble Smoother (ES), Ensemble Smoother with Multiple Data Assimilation (ES-MDA) and Iterative Ensemble Smoother (IES).
https://ert.readthedocs.io/en/latest/
GNU General Public License v3.0
103 stars 107 forks source link

Investigate whether we can use Dask to handle all drivers directly #6328

Closed xjules closed 1 year ago

xjules commented 1 year ago

Check suitability of dask library to replace drivers interface. Check for more info here: https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.LSFCluster.html Also talk @sondreso for more info.

kwinkunks commented 1 year ago

Dask and dask-jobqueue

Looking at #6328 - Investigate whether we can use Dask to handle all drivers directly.

API

dask can create a 'local' cluster with dask.distributed.LocalCluster (but people often do this implicitly by just going straight to a Client). And to create a local cluster with all workers running in dedicated subprocesses, there's also dask.distributed.SubprocessCluster.

For 'real' clusters, the following queues are supported out of the box:

There are sub-projects for other scenarios:

A defunct, archived project suppports DRMAA: dask-drmaa.

Finally, it is also possible to use the awaitable dask.distributed base classes Scheduler, Worker or Nanny, and Client to create objects explicitly.

Configuration

In order:

For example:

jobqueue:
  pbs:
    cores: 12
    memory: 8GiB
    processes: 6
    queue: hb120
    local-directory: $LOCAL_STORAGE

Set up a session on a cluster

On prem:

ssh st-linrgs034.st.statoil.no
source /prog/res/komodo/stable/enable
komodoenv --no-update ~/kenv
source /private/mtha/kenv/enable
python -m pip install dask-jobqueue
/prog/LSF/script/setup-ssh-keyauth

On Azure:

ssh s034-lcatop01.s034.oc.equinor.com
source /prog/komodo/stable/enable
komodoenv --no-update --root /prog/komodo ~/kenv
source /private/mtha/kenv/enable
python -m pip install dask-jobqueue

Then I'm running things in IPython. One could also run a headless Jupyter Lab session on the cluster and ssh in from a desktop, but I think we might need a port opening for this (e.g. 8888).

Example on prem

Running on st-linrgs034.st.statoil.no.

from dask_jobqueue import LSFCluster as Cluster
cluster = Cluster(cores=6, memory='2GiB', use_stdin=False)
cluster.scale(10)  # n=10 workers

This seems to start workers on the cluster.

Failing to pass use_stdin=False results in an error: Task exception was never retrieved.

We can look at a job script:

print(cluster.job_script())

Results in:

#!/usr/bin/env bash

#BSUB -J dask-worker
#BSUB -n 4
#BSUB -R "span[hosts=1]"
#BSUB -M 2000000
#BSUB -W 00:30

/private/mtha/kenv-oct/root/bin/python -m distributed.cli.dask_worker tcp://143.97.182.7:45463 --nthreads 1 --nworkers 4 --memory-limit 476.84MiB --name dummy-name --nanny --death-timeout 60

However, submitting jobs does not currently work.

Example on Azure

This computes instantly:

from dask_jobqueue import PBSCluster
from dask.distributed import Client

cluster = PBSCluster(cores=6, memory="2GiB", queue='hb120')
cluster.scale(10)  # n=10 workers
client = Client(cluster)
future = client.submit(lambda: 1 + 1)
print(future.result())

Gives 2, as expected. Then

print(cluster.job_script())

Results in:

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q hb120
#PBS -l select=1:ncpus=24:mem=1908MB
#PBS -l walltime=00:30:00

/private/mtha/kenv-oct/root/bin/python -m distributed.cli.dask_worker tcp://10.85.202.20:38549 --nthreads 4 --nworkers 6 --memory-limit 317.89MiB --name dummy-name --nanny --death-timeout 60

Fire and forget

If there are no references to a future, or the future goes out of scope, then it is (or might be? I'm not sure) considered inactive and Dask will not run its tasks. If, for example, we aren't interested in a process's outputs, only its side-effects (e.g. when writing a file) then we can force Dask to ignore the inactive future and run the task anyway:

For a script foo.sh like:

#!/usr/bin/bash
sleep 1
echo OK > foo.out

We can run dask.distributed.fire_and_forget() like (also on a collection of futures):

fire_and_forget(client.submit(os.system, "./foo.sh"))
xjules commented 1 year ago

Looks very promising! We should find out whether (in the long run at least), we can remove websocket communication and replace it purely by the dask API. The topics / docs which might be relevant to this:

xjules commented 1 year ago

When playing around dask, we can use the following the get status from the dask workers:

    client = Client()
    futures = [client.submit(dummy_job, i) for i in range(10)]

    while not all(f.done() for f in futures):
        time.sleep(1)  # Check every second
        workers_info = client.scheduler_info()['workers']
        for worker, info in workers_info.items():
            mem_usage = info["metrics"]["memory"]
            print(f"Worker {worker} - Current memory usage: {mem_usage:}")

    for future, i in zip(futures, range(10)):
        print(future.result())
xjules commented 1 year ago

Another feature worth exploring is WorkerPlugin: https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.WorkerPlugin transition can then give us access to workerstate: https://distributed.dask.org/en/latest/worker-state.html

sondreso commented 1 year ago

The reason why the LSF plugin to dask does not work on our compute cluster is because it relies on using stdin, which does not work for our LSF setup. This is a hack to fix it: https://github.com/equinor/ert/blob/853b0c0dc4f383b43db12105ec921996920b9b6e/src/ert/ensemble_evaluator/builder/_prefect.py#L72-L79 and the overwriting th function in the LSFJob class in dask.

https://github.com/equinor/ert/blob/853b0c0dc4f383b43db12105ec921996920b9b6e/src/ert/ensemble_evaluator/builder/_prefect.py#L95

If you look at the repository at the time of the commit above there is an implementation of Dask that was running both in Azure and on LSF :slightly_smiling_face:

kwinkunks commented 1 year ago

I found that this also seems to work and allows the cluster to start:

cluster = Cluster(cores=6, memory='2GiB', use_stdin=False)

However, I'm still having trouble getting a job to actually run...

sondreso commented 1 year ago

Just be aware that if you set use_stdin=False, the file defined by script_filename still need to be placed on a network disk and available to all machines with our LSF setup :slightly_smiling_face:

kwinkunks commented 1 year ago

Dask's XXXCluster class documentation

Bit of a side-note about a weird feature of the docs.

image

This class has the following structure:

PBSCluster < JobQueueCluster < distributed.SpecCluster < distributed.Cluster < distributed.SyncMethodMixin

The PBSCluster class has no __init__() method so implicitly calls the parent constructor, and that is the API seen in the call signature (blue box).

All of the remaining arguments in the documentation are passed as **job_kwargs and used by the "job", called job_cls inside the PBSCluster class. Jobs are instances of PBSJob, a subclass of Job, which is a subclass of distributed.ProcessInterface.

kwinkunks commented 1 year ago

Things to know about Dask's cluster classes

Note: The LocalCluster here is the one in dask_jobqueue (see above)

LocalCluster
(see note)
LSFCluster PBSCluster SLURMCluster Docstring
project 🔵 🔵 LSF: Project associated with each worker job. Passed to
#BSUB -P option.
account 🔵 🔵 PBS: Accounting string associated with each worker job.
Passed to #PBS -A option.
ncpus 🔵 Number of cpus. Passed to #BSUB -n option.
mem 🔵 Request memory in bytes. Passed to #BSUB -M option.
lsf_units 🔵 Unit system for large units in resource usage set by
LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster.
use_stdin 🔵 LSF's bsub command allows us to launch a job by passing
it as an argument (bsub /tmp/jobscript.sh) or feeding
it to stdin (bsub < /tmp/jobscript.sh).
job_cpu 🔵 Number of cpu to book in SLURM, if None, defaults to
worker threads * processes.
job_mem 🔵 Amount of memory to request in SLURM. If None, defaults to
worker processes * memory
resource_spec 🔵 🔵 PBS: Request resources and specify job placement. Passed
to #PBS -l option.
kwinkunks commented 1 year ago

Closing this as completed.

The squad has reviewed the various components of dask, distributed, and dask_jobqueue and concluded that it can very likely help with the implementation of ERT (as we've concluded before). We value features like:

We will move on to some small experiments as we press ahead with refactoring the existing drivers.