Closed wilkeber closed 1 year ago
I noticed you are working on this experimental feature in branch feature-dask, see https://github.com/elcorto/psweep/issues/9.
These lines look a lot like what I am hoping for without the need for pickling anything. Essentially worker_wrapper_partial_pool
(and thus eventually our own provided worker function) gets the dictionary directly, which fully avoids the need for the template run.py. Is this correct?
Thanks for your interest in the project.
Your question is spot-on and describes accurately the template workflow's limitations.
Yes, using dask
(#9) will avoid all that and you just need to replace
ps.run(func, params)
by
from dask.distributed import Client
# Or SGECluster or ...
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
... HPC settings here ...
)
client = Client(cluster)
ps.run(func, params, dask_client=client)
I have now pushed a branch develop
, which ATM is the same as feature-dask
. This won't see any force-push :) and you can experiment with the new dask support. Note that in main
and newer branches, so develop
and feature-dask
, run_local()
was renamed to run()
, but we have backwards compatibility and you'll see a deprecation warning.
You need to install psweep manually as described here, e.g.
python -m venv psweep
. ./psweep/bin/activate
git clone ...
cd /path/to/psweep
git checkout develop
pip install -e ".[dask]"
I have extended the docs in that regard, adding a detailed comparison of dask vs templates and when to use which method. GitHub renders the markdown quite well but is missing things like included example files. There are new examples here. The improved docs also explain a pickle-based workflow using templates as you describe (since I did that in the past but haven't documented it).
If you want to render the docs locally (which needs a lot of stuff installed), do, in the same venv as above probably:
cd /path/to/psweep
pip install -e ".[doc]"
cd doc
sh generate-doc.sh
That's what we do in the gh pages CI as well.
If you use the dask support, feel free to report back any problems that you faced. Thanks a lot.
I have been reading about dask and testing the dask supported version. It took me a bit to get things roughly right, so I thought I'd leave some comments on the issues I came across. Note they aren't real psweep issues, just two lessons learned that might help others wanting to use the dask support.
1st: setting up environment
Above, you gave excellent instructions, which unfortunately I partially ignored at first. I did not install psweep via pip install -e
as indicated above, since I had a local clone of the repo and thought I'd just add the path myself. This was a confusing mistake, for someone unexperienced with dask and distributed computing.
Everything seem to run without error, but my computations were not executed. I believe the issue was that the Callable
called worker
in psweep was defined in my main script, which imported psweep. The distributed tasks attempted to import psweep as well, but since it was not properly installed in the python environment shared by all machines, they exited with an error. This error was not propagated, but checking the log files eventually showed the import error.
The solution was to simply follow your guidance and install the local psweep version from the dev branch.
2nd: infinite loop re-computing same parameter sets I was interested in the case of having more jobs than what could be finalized by all dask workers within their defined walltime. I came across this section in the documentation and the issue discussing this situation:
https://github.com/dask/dask-jobqueue/issues/122
Summarized in my own words: the cluster should be set to scale adaptively using cluster.adapt(minimum=1, ...)
instead of cluster.scale()
, which allows to bring up dask workers after they have hit the walltime. Furthermore, the cluster must be instantiated with extra arguments, which indicate the lifetime of the jobs (a time somewhat shorter than the walltime) to allow the adapt()
method to transfer the state of one worker to another before it hits the walltime and is killed. I was testing this simple script, which ended up being pretty much the same as those discussed in https://github.com/dask/dask-jobqueue/issues/122.
from dask.distributed import Client
from distributed.worker import logger
from dask_jobqueue import SGECluster
import time
import psweep as ps
def local_analyzer(pset):
logger.info(f'starting {pset}') # to get a report in the dask worker logs
time.sleep(5)
logger.info(f'finished {pset}')
return {'b': pset['a']}
if __name__ == "__main__":
# crate N points to 'sweep'
N = 60
a = ps.plist("a", range(N))
params = ps.pgrid([a])
with SGECluster(queue='queue.q',
cores=1,
memory='1GB',
walltime='00:01:00',
worker_extra_args=["--lifetime", "40s", "--lifetime-stagger", "10s"]) as cluster:
# scale the cluster to more workers
cluster.adapt(minimum_jobs=2, maximum_jobs=2)
with Client(cluster) as client:
# distrubte jobs
futures = client.map(local_analyzer, params)
# wait for results
results = client.gather(futures)
for result in results:
print(result)
# using psweep
# df = ps.run(local_analyzer, params, dask_client=client, calc_dir=CALC_DIR, tmpsave=True)
# ps.df_print(df, cols=["_pset_id", "_exec_host"])
print('out of client')
# need to wait a bit for the cluster to resize due to its adaptive nature or we get an (unimportant) error
time.sleep(10)
print('out of cluster')
Using the suggested solution, I still got the KilledWorker
error discussed in https://github.com/dask/dask-jobqueue/issues/122 and the docs. When reading the logs of different dask-workers, it was clear that the dask-workers were being shut down as intended, but the logger.info()
calls in local_analyzer()
revealed that the same psets were being calculated over and over again. It seemed that the dask scheduler was not quite collecting all results and redistributing the jobs correctly. The log below is an example output, which shows the logging of one dask worker.
Warning: no access to tty (Bad file descriptor).
Thus no job control in this shell.
stty: standard input: Inappropriate ioctl for device
2023-10-11 11:22:22,641 - distributed.nanny - INFO - Start Nanny at: 'tcp://XX.X.X.XXX:XXXXX'
2023-10-11 11:22:23,491 - distributed.worker - INFO - Start worker at: tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,491 - distributed.worker - INFO - Listening to: tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,491 - distributed.worker - INFO - Worker name: SGECluster-0
2023-10-11 11:22:23,492 - distributed.worker - INFO - dashboard at: XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,492 - distributed.worker - INFO - Waiting to connect to: tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,492 - distributed.worker - INFO - -------------------------------------------------
2023-10-11 11:22:23,492 - distributed.worker - INFO - Threads: 1
2023-10-11 11:22:23,492 - distributed.worker - INFO - Memory: 0.93 GiB
2023-10-11 11:22:23,492 - distributed.worker - INFO - Local Directory: /tmp/3010181.1.queue.q/dask-scratch-space/worker-1dqww_9c
2023-10-11 11:22:23,492 - distributed.worker - INFO - -------------------------------------------------
2023-10-11 11:22:23,850 - distributed.worker - INFO - Starting Worker plugin shuffle
2023-10-11 11:22:23,850 - distributed.worker - INFO - Registered to: tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,851 - distributed.worker - INFO - -------------------------------------------------
2023-10-11 11:22:23,851 - distributed.core - INFO - Starting established connection to tcp://XX.X.X.XXX:XXXXX
2023-10-11 11:22:23,853 - distributed.worker - INFO - starting {'a': 6}
2023-10-11 11:22:28,853 - distributed.worker - INFO - finished {'a': 6}
2023-10-11 11:22:28,857 - distributed.worker - INFO - starting {'a': 5}
2023-10-11 11:22:33,857 - distributed.worker - INFO - finished {'a': 5}
2023-10-11 11:22:33,858 - distributed.worker - INFO - starting {'a': 8}
2023-10-11 11:22:38,859 - distributed.worker - INFO - finished {'a': 8}
2023-10-11 11:22:38,861 - distributed.worker - INFO - starting {'a': 10}
2023-10-11 11:22:43,861 - distributed.worker - INFO - finished {'a': 10}
2023-10-11 11:22:43,864 - distributed.worker - INFO - starting {'a': 12}
2023-10-11 11:22:48,864 - distributed.worker - INFO - finished {'a': 12}
2023-10-11 11:22:48,866 - distributed.worker - INFO - starting {'a': 9}
2023-10-11 11:22:53,866 - distributed.worker - INFO - finished {'a': 9}
2023-10-11 11:22:53,868 - distributed.worker - INFO - starting {'a': 11}
2023-10-11 11:22:58,868 - distributed.worker - INFO - finished {'a': 11}
2023-10-11 11:22:58,870 - distributed.worker - INFO - starting {'a': 13}
2023-10-11 11:23:03,870 - distributed.worker - INFO - finished {'a': 13}
2023-10-11 11:23:03,872 - distributed.worker - INFO - starting {'a': 14}
2023-10-11 11:23:03,878 - distributed.worker - INFO - Closing worker gracefully: tcp://XX.X.X.XXX:XXXXX. Reason: worker-lifetime-reached
2023-10-11 11:23:03,881 - distributed.worker - INFO - Stopping worker at tcp://XX.X.X.XXX:XXXXX. Reason: worker-lifetime-reached
2023-10-11 11:23:03,883 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://XX.X.X.XXX:XXXXX'. Reason: worker-lifetime-reached
2023-10-11 11:23:03,884 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name='execute(local_analyzer-cd7475212b8d587afc224856da19387e)' coro=<Worker.execute() done, defined at /path/to/python/python3.11/site-packages/distributed/worker_state_machine.py:3609>> ended with CancelledError
2023-10-11 11:23:03,885 - distributed.core - INFO - Connection to tcp://XX.X.X.XXX:XXXXX has been closed.
2023-10-11 11:23:08,872 - distributed.worker - INFO - finished {'a': 14}
2023-10-11 11:23:08,874 - distributed.nanny - INFO - Worker closed
2023-10-11 11:23:11,138 - distributed.nanny - INFO - Closing Nanny at 'tcp://XX.X.X.XXX:XXXXX'. Reason: nanny-close-gracefully
2023-10-11 11:23:11,139 - distributed.dask_worker - INFO - End worker
What solved the issue? I took the wild and naive guess, that perhaps my timescales are a bit off and started increasing the number of jobs, walltimes, lifetimes and lifetime-stagger. Eventually, with N=180
, walltime='00:05:00'
and worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"]
it works for my example. I noticed that a lifetime-stagger
of 0 never works and my guess is that for this highly deterministic workload, all workers are always killed at the same time and none can take care of the other workers' states. Either way, it seems like dask-jobqueue is not quite robust and I don't know what happens if our queues are full and we cannot bring up alternative workers quickly enough. My guess is that all intermediate results are lost and will be recomputed (even if psweep's tmpsave is set to True), which is not ideal. To mitigate the number of unnecessary recomputes, I believe a possible workaround for psweep would be do adapt the lines
like this:
chunk_size = some_multiple_of_cluster_size # I don't know what makes most sense here
results = []
for chunk in (params[pos:pos+chunk_size] for pos in range(0, len(params), chunk_size)):
futures = dask_client.map(worker_wrapper_partial_pool, chunk)
results.extend(dask_client.gather(futures))
I understand that you would not want to chase after strange dask-jobqueue effects, so the same lines above can also be done outside psweep, i.e., chunking the parameter list given to psweep.run()
. This is most likely what I will be doing to be more robust against full queues and non-overlapping dask-workers without the need to do full recomputations.
TL;DR: scale your cluster with adapt()
and provide the additional parameters lifetime
and lifetime-stagger
. They should not be too small and in the order of minutes for this solution to (most likely) work.
Thank you for this very detailed report, that's highly appreciated!
Some comments on your findings.
Environment
We don't have any HPC-related install docs yet. Would something like this have helped?
"Jobs on HPC systems usually inherit the environment from which they were started. For instance, you can have a venv
in your $HOME
with psweep
and dask
installed. This will end up in $HOME/.virtualenvs/psweep
which is venv
's default. Then submit from this environment, which will propagate all environment variables to batch jobs. $HOME
on most if not all HPC machines is on a shared file system mounted at each node, which makes things work."
# Assuming that you (need to) use the HPC machine's module system to load python
hpc $ module load python
hpc $ python -m venv psweep
hpc $ . ./psweep/bin/activate
(psweep) hpc $ cd /path/to/psweep
(psweep) hpc $ pip install -e ".[dask]"
(psweep) hpc $ cd /path/to/work/dir
(psweep) hpc $ sbatch dask_control.job
Worker lifetime and cluster.adapt()
I wasn't aware of that part of dask_jobqueue
's docs, thanks for sharing!
Robustness of dask_jobqueue
and psweep
's relation to workflow engines
HPC machines are a complex piece of tech, so I think that dask_jobqueue
is a great help. It covers probably about 80% of the common situations, but it can't handle each setup for sure (+- a few bugs maybe).
Outside of the dask
ecosystem and psweep
's scope, there are many projects which deal with failed jobs on HPC and other infrastructure. There is a non-comprehensive list in the psweep
docs which lists tools for parameter sweeps, experiment tracking and workflow orchestration, all of which relate in some way. Some tools fall into multiple categories, so the boundaries a blurry here. Workflow engines usually offer resilience measures against failing workers. I can't give recommendations since I haven't used them yet, but nevertheless here are docs related to failed jobs for aiida
and fireworks
. Both tools are built by the materials modeling community, which makes heavy use of HPC resources.
Those tools are super powerful, but also more complex to set up and use (so no free lunch here). Many require you to set up a database server (PostgreSQL, MongoDB) where they can store results and metadata. They can also manage complex workflows such as jobs that depend on each other as a DAG and much more.
In psweep
we assume the simplest case, workloads are independent (embarrassingly parallel). Thus we just expose the dask_client
arg to leverage dask
's concurrent.futures
API. psweep
provides simple helpers to set up the params (plist
, pgrid
, stargrid
) and gives you a DataFrame
with useful metadata such as pset
hashes and UUIDs for pset
s and runs.
One way to solve task dependencies with psweep
is by running things in order manually, say 10prepare.py
, 20production.py
, 30eval.py
, where the first two can use psweep
to compute stuff, update the database and store intermediate data on disk, which the next script would pick up. The "workflow" is to run all scripts in order. This is super low tech, simple, but also a bit brittle. For more challenging dependencies and more reproducibility, I'd look into using one of the tools above.
In the context of psweep
these tools may become interesting if one can use their job scheduling features (so what dask.distributed
+ dask_jobqueue
do) without the need to buy into their DSLs or database setups.
Code changes to support specific use cases (chunking)
You are right, I probably wouldn't want to add features that solve a narrow use case which I haven't been facing and testing in production myself. However, if you have a well tested feature that doesn't break current functionality and comes with tests, please feel free to send a PR.
psweep
notes
ps.run(..., tmpsave=True)
will save results from each pset on disk to <calc_dir>/tmpsave/<run_id>/<pset_id>.pk
. This won't be used by dask
workers. This feature assumes that you collect those results manually into a DataFrame
later by a post-processing script. The idea for this was that if some workers fail, we have at least some results saved. Also for long running workloads, one can start to analyze already finished results. But as mentioned, that requires a bit of scripting. Also I haven't used it all that much in production. See also this comment.plist
, you can skip the pgrid
call.Dear @wilkeber, if you have more findings to add here, please don't hesitate to do so. The basic dask
support is now in release 0.11.1 in the form discussed above, ps.run(..., dask_client=client)
. If you run into any issues with this, please open a new issue. I'm taking the liberty to close this one since I think the original question is answered. Again, thank you for your question and detailed report!
Sorry for the missing reply on your previous comment. It contained interesting information, but for now I have continued working with the setup as is with an occasional restart. Before such a restart I gathered the tmpsave results, created/expanded the database.pk file and re-ran the simulation. It is far from elegant but does the work for now.
I actually like the clarity and simplicity of psweep, so leaving it as is, is great! The problems I am facing are rather dask problems, but since psweep provides enough transparency and does not obfuscate those problems, I can work with them.
Even if the issue is closed, I'll add one more comment, because I believe it might be useful for people using psweep with dask, when they are actually developing their own applications. As you pointed out, jobs in HPC cluster usually inherit the environment. However, when starting a sweep from some local repository, in which you are currently developing and quickly changing things, I found it useful to know of the UploadDirectory
plugin. It allows you to copy your files onto the workers environment, see https://distributed.dask.org/en/stable/plugins.html#built-in-nanny-plugins
import psweep as ps
from dask.distributed import Client
from dask_jobqueue import SOMECluster
from distributed.diagnostics.plugin import UploadDirectory
...
with Client(cluster) as client:
# upload local repo to jobs
client.register_plugin(UploadDirectory(path/to/my/project, update_path=True))
ps.run(...)
...
Even if the issue is closed, I'll add one more comment, because I believe it might be useful for people using psweep with dask, when they are actually developing their own applications.
No worries, these notes are very much welcome. I may integrate some of your tips in the docs at some point. For now, I have linked from the docs to this issue at least.
As you pointed out, jobs in HPC cluster usually inherit the environment. However, when starting a sweep from some local repository, in which you are currently developing and quickly changing things, I found it useful to know of the
UploadDirectory
plugin. It allows you to copy your files onto the workers environment, see https://distributed.dask.org/en/stable/plugins.html#built-in-nanny-plugins
Interesting, thanks for sharing. I was not aware of this. If by repository you mean a git repo, then my way of dealing with this, if the code in the repo is a Python project, is
python -m venv --system-site-packages ...
(actually I use virtualenvwrapper)cd /path/to/dev-repo; pip install -e .
the dev-repo
into that venv, which just says to import code from /path/to/dev-repo
and not some install locationdask_control.slurm
from within that venvThis makes sure (at least on our machine, using SLURM) that each worker has access to the current state in dev-repo
. For non-python stuff, a shared file system should grant access to modified files, so I wonder which use case you cover by uploading, where I guess files end up in each worker's tmp dir (e.g. something like dask_tmp/dask-scratch-space/worker-f5qjbu83/
if SLURMCluster(..., local_directory="dask_tmp")
)?
Hi! I enjoy using psweep for local sweeps and started looking into the experimental remote cluster processing bits, since I would like to expand my sweeps onto our HPC. I have some questions on how to use the current implementation:
In your example you have a template run.py, which is pretty much what is executed on each machine. You eventually use the substitute method from the string module to replace placeholders for each parameter. I makes perfect sense, but it is a little less convenient than what was possible with the
run_local()
function, where the worker function is directly given and the parameter dictionary is directly passed to that function. I was hoping to have a similar mechanism available, i.e., where I just get a dictionary and can have my 'template' run.py just deal with a dictionary. This allows to have an actually executable template run.py and I do not have to modify it for each sweep (e.g., when adding a sweep parameter).The question I have is how to pass such a dictionary to a new script. Do you have a good idea? A straightforward answer could be by pickling it and simply loading the dictionary. Since we want to add the parameters to a pickable database, all entries must be pickable anyway. The difficulty I have now, is that
prepare_batch()
does the replacement and I cannot access thepset
to create the pickleddict
myself. Looking at your suggested workflow here, it seems that there used to be a hook for afunc()
, which I could have used, but it there is none inprepare_batch()
. Perhaps I misunderstand something, but could it be that the documentation on the workflow is not quite up to date?