dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
234 stars 143 forks source link

Work-arounds for CPU time limits where the Dask scheduler lives? #471

Closed lesteve closed 2 years ago

lesteve commented 3 years ago

One of the cluster I have access to has set CPU time limits on the login nodes.

This makes it a lot less convenient to use dask-jobqueue:

@dask/dask-jobqueue if anyone has suggestions/work-arounds I would be very interested!

A bit more context:

Possible variation of this issue:

stuarteberg commented 3 years ago

the cluster only has GPU nodes (a bit of a over-simplification but accurate enough), so there is no easy way to have a CPU-only job where the Dask scheduler lives.

One option is to host the scheduler on one of the GPU nodes. But of course, it would be a waste of a GPU if that node can't participate as a worker, too.

If your non-login nodes are allowed to launch jobs themselves, then I propose the following:

  1. Run the scheduler on a GPU node
  2. From there, launch a cluster of N-1 workers
  3. Create one more worker locally, and add it to the cluster, so the local node is not wasted.

Here's some example code that works on my LSF cluster. I'm not really an expert on the dask cluster API, so let me know if there's a more straightforward way to create a heterogeneous cluster. In this code, I'm manually manipulating the worker_spec dictionary, which is used by the scale() function to create new workers.

In [1]: import time
   ...: from distributed import LocalCluster, Client
   ...: from dask_jobqueue import LSFCluster
   ...:
   ...: cluster = LSFCluster(ip='0.0.0.0', cores=1, memory='15GB', ncpus=1, mem=int(15e9), log_directory='dask-logs')
   ...: cluster.scale(2)
   ...:
   ...: client = Client(cluster)
   ...: while client.status == "running" and len(cluster.scheduler.workers) < 2:
   ...:     print("Waiting for remote workers to start...")
   ...:     time.sleep(1.0)
   ...:
   ...: # Temporarily start up a local cluster just to copy the worker spec.
   ...: with LocalCluster(1) as lc:
   ...:     local_worker_spec = lc.worker_spec[0]
   ...:
   ...: cluster.worker_spec[2] = local_worker_spec
   ...:
   ...: # Force cluster._i to increment.
   ...: # Notice the side effect in SpecCluster.new_worker_spec() implementation.
   ...: cluster.new_worker_spec()
   ...:
   ...: # Kick the cluster event loop to make it use the new worker spec.
   ...: cluster.scale(3)
Waiting for remote workers to start...
Waiting for remote workers to start...

To test that it worked, I can call client.run to see which nodes my workers are running on. As expected, two of my workers are running on a remote node, and one of them is running on the local node (h10u03), where the client and scheduler are also running.

In [2]: import socket
   ...: socket.gethostname()
Out[2]: 'h10u03.int.janelia.org'

In [3]: client.run(socket.gethostname)
Out[3]:
{'tcp://10.36.110.13:31650': 'h10u03.int.janelia.org',
 'tcp://10.36.111.17:25341': 'h11u07.int.janelia.org',
 'tcp://10.36.111.17:31518': 'h11u07.int.janelia.org'}

I know that both LSF and SGE are capable of launching jobs from ordinary (non-login) nodes, but some cluster administrators choose not to permit it for reasons I do not understand. If that's the case for your cluster, it may be worth asking the adminstrators why they have chosen not to support such a useful feature.

kathoef commented 3 years ago

One of the cluster I have access to has set CPU time limits on the login nodes.

Just out of curiosity, do you know how they do that from a technical point of view? Do you know why they do that? Seems very restrictive to me and very much like they don't want interactive work on their system at all.

those jobs use deep learning and typically can last 1-5 days. The fact that you need the Dask scheduler to live for that much time (your Dask worker dies after death-timeout if it can not contact the Dask scheduler), maybe even longer if your Dask scheduler coordinates many jobs that don't necessarily start at the same time is going to be an issue

I was actually thinking that a classical batch task workflow might be more promising here? That is, interactively developing the code on a subset of the problem (probably on one of the compute nodes, because of this 30 minutes CPU limit on the login nodes), then put together a script that will non-interactively solve the problem but up-scaled / parallelized to a fixed number of nodes (the more, the faster in terms of walltime the calculation will be?). That's at least how I solved a machine learning problem before I knew / had developed a way of how to work on our university's cluster nodes interactively with Jupyter.

An additional thought: In ocean modeling where numerical experiments easily occupy a part of the cluster for up to 30 days (with job walltime limits that are much smaller unfortunately) we often apply job chain techniques, where the whole calculation is split across a series of jobs, where each job writes restart information to disk before it terminates for the next job to start from. But I absolutely don't know if such an approach would be feasible with the tools that are used in your particular case.

riedel commented 3 years ago
3\. Create one more worker locally, and add it to the cluster, so the local node is not wasted.

This works great from my experience with spark, however, I have been struggling a lot with walltime limits here, particularly if the workers are too long running and/or the even worse the scheduler is killed. In HTCondor, i think, it would help eg. if you have a Parallel universe to ensure that the cluster lives on (our cluster does not support this :( ) , but then if your jobs are too long-running with different run-times this leads to a lot of idling of workers. Actually if your scheduler lives on a node that finishes early you have the same problem. This is particularly the case if there are not enough ressources to start all requested workers in the beginning and you using a standard batch submission.

I also think, thus, it is essential to do checkpointing and pulling results into a (shared/distributed) filesystem, so that the whole thing could be restarted. My feeling is that workers could easily run to an end without the scheduler, write to disk and fail. The restarted job could just take the result from disk and terminate correctly. Frameworks like snakemake can nicely resume work that has been already done. Does someone have a good example/strategy for this in dask?

willirath commented 3 years ago

If the time limitation hitting the scheduler cannot be fixed and hence easy resuming of Dask computations remains, there could be solutions using serialized Dask graphs. You can cloudpickle Dask graphs (even as part of high-level objects like xarray.Datasets) and there is (orphaned?) https://github.com/radix-ai/graphchain which might be helpful here.

lesteve commented 3 years ago

Thanks a lot for the answers:

mrocklin commented 3 years ago

Some wacky ideas:

guillaumeeb commented 3 years ago

On this point, I see two potentials solutions in dask-jobqueue:

Just out of curiosity, are these Jean-Zay cluster rules?

lesteve commented 3 years ago

Just out of curiosity, are these Jean-Zay cluster rules?

You guessed it right :wink:. I tried to ask whether CPU time limits on the login nodes could be a bit more generous (2-3 hours) but they were extremely reluctant to do so saying that login nodes are not designed for this kind of workflow :cry:.

Just submit your workload on a node, so that the client and dask scheduler get started there and live a long life. This has already been more or less proposed. Push up #390 and #186...

This would definitely be progress but sometimes this is not enough, for example all your jobs needs the max walltime of your queue, once your Dask scheduler starts on a node, your "slave jobs" (i.e. the ones launched by cluster.scale) may not start immediately. That means that your Dask scheduler will be killed before the work is finished and you will lose some work.

Maybe a specificity with deep-learning jobs (although I am sure this exists in other more traditional HPC disciplines) is that potentially the tasks (i.e. Python function execution) can span multiple jobs (i.e. the job relaunch themselves once they reach the walltime). That means that in principle if the Dask scheduler coordinates such tasks it should be able to last longer than the max queue walltime so there seems to be no convenient place to run the Dask scheduler in a HPC cluster.

Most use cases around me are embarassingly parallel (e.g. hyperparameter grid-search) so people don't really need all the features of Dask. In some cases people have started using https://github.com/facebookincubator/submitit which has less features but is more adapted to this kind of HPC constraints.

Maybe I need to try my idea of sshuttle to run the Dask scheduler on a machine outside the HPC cluster (https://github.com/dask/dask-jobqueue/issues/471#issuecomment-718503750), we'll see if I can find the time ...

guillaumeeb commented 3 years ago

Maybe a specificity with deep-learning jobs (although I am sure this exists in other more traditional HPC disciplines)

Yeah, this is common concern, which is why it's also great to be able to do things like https://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers with Dask.

In some cases people have started using https://github.com/facebookincubator/submitit which has less features but is more adapted to this kind of HPC constraints

So this is where submitit can be useful. But you'll need a Slurm scheduler for that... (see #462).

In your case, we'll need Dask Workers that can live without a running Scheduler, or a sporatic/stateless Scheduler that can be stopped and restarted:

But his is probably really complicated to do, and too much specific.

riedel commented 2 years ago

Related issue (just stumbled across it): https://github.com/dask/distributed/issues/1072

guillaumeeb commented 2 years ago

I'm going to close this issue as I think we discussed all the options and there is no existing workaround currently. Overall, I see only two options for Dask to answer this need: