dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
234 stars 142 forks source link

Handling workers with expiring allocation requests #122

Closed wgustafson closed 3 years ago

wgustafson commented 6 years ago

I am trying to figure out how to handle the case of dask workers getting bumped from a cluster due to their requested allocation time expiring. From the intro YouTube video at https://www.youtube.com/watch?v=FXsgmwpRExM, it sounds like dask-jobqueue should detect when a worker expires and automatically start a replacement, which is what I want. However, my testing on DOE's edison computer at NERSC is not getting that behavior. If it matters, edison uses SLURM.

I have tried setting up my cluster two ways and both behave the same. I start a worker that uses dask.delayed to do a bunch of embarrassingly parallel tasks, the server spawns one worker, that worker does the first task or two, the worker expires, the server seems to hang, and nothing else happens.

The first approach I used to setup the cluster was with "scale":

    cluster = SLURMCluster(cores=1, processes=1)  # need all the memory for one task
    cluster.scale(1)  # testing with as simple as I can get, cycling 1 worker
    client = Client(cluster, timeout='45s')

@josephhardinee suggested a 2nd approach using "adapt" instead:

    cluster = SLURMCluster(cores=1, processes=1)  # need all the memory for one task
    cluster.adapt(minimum=1, maximum=1)  # trying adapt instead of scale
    client = Client(cluster, timeout='45s')

The dask-worker.err log concludes with:

slurmstepd: error: *** JOB 10234215 ON nid01242 CANCELLED AT 2018-08-10T13:25:30 DUE TO TIME LIMIT ***
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.4.227:35634'
distributed.dask_worker - INFO - Exiting on signal 15
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>

Am I expecting more from dask-jobqueue than I should? Or, is this a bug in my implementation or in dask.distributed of dask-jobqueue?

Thanks, Bill

guillaumeeb commented 5 years ago

Is this using multiprocessing vs threading as a backend?

Exactly!

Thanks for your support and your patience!

You're helping us to provide a smoother experience, it's an important issue!

wkerzendorf commented 5 years ago

I have a similar issue with the KilledWorker exception. I get the feeling from this thread that the KilledWorker is blamed on the task instead of the worker that might be killed from SLURM. I think the @jni solution might also work for me and I will try it out and report back.

wkerzendorf commented 5 years ago

@jni solution doesn't really work for me. The queue does not refill and the task is never executed.

the other problem I have experienced - not sure where to paste this is

ValueError: Worker already exists tcp://172.16.0.93:38765
distributed.core - ERROR - Worker already exists tcp://172.16.0.93:38765
Traceback (most recent call last):
  File "/beegfs/wek224/miniconda/envs/tardis3/lib/python3.6/site-packages/distributed/core.py", line 346, in handle_comm
    result = yield result
  File "/beegfs/wek224/miniconda/envs/tardis3/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/beegfs/wek224/miniconda/envs/tardis3/lib/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
    yielded = next(result)
  File "/beegfs/wek224/miniconda/envs/tardis3/lib/python3.6/site-packages/distributed/scheduler.py", line 1282, in add_worker
    raise ValueError("Worker already exists %s" % address)
ValueError: Worker already exists tcp://172.16.0.93:38765

I guess this is more of a scheduler problem.

guillaumeeb commented 5 years ago

@wkerzendorf, did you use @jni exemple or your own? Can you provide an example to reproduce this behavior in a few seconds or minutes ?

wkerzendorf commented 5 years ago

@guillaumeeb I have my own.

futures = [client.submit(run_tardis_dask, param_id, cur_config) for param_id, cur_config in test_configs]
for future in as_completed(futures):
    try:
        i = future.result()
    except KilledWorker:
        client.retry([future])
        futures.append(future)

what I find strange that I need to do this. This is not the way it is intended to work - right?

wkerzendorf commented 5 years ago

@guillaumeeb and this solution does work for (in contrast to what I said earlier).

guillaumeeb commented 5 years ago

Ok, so I think there three things to do here:

@wkerzendorf or @jni, would you want to do part of these?

jni commented 5 years ago

@guillaumeeb

would you want to do part of these?

absolutely! Indeed I intended to work on this but it fell to the back of my todo queue because my queue is incorrectly implemented as a stack. =P

The next week I am completely unavailable but I will try to prioritise this starting Mar 12.

In my opinion you should be able to use my example almost as-is while adjusting the time.sleep call, the number of nodes, and the job duration.

wkerzendorf commented 5 years ago

@guillaumeeb - my suspicion is that when a node is killed due to multiple reasons (job ends on HPC) the task is blamed for the KilledWorker and it is not again scheduled. I think with this in mind it would be interesting to see what the code does.

@jni I'm happy to get together (virtually) and make an example together (you can contact me at wkerzendorf@gmail.com).

bw4sz commented 5 years ago

Jumping in on this long thread. What is the status and best practice for adaptive workers on SLURM? I've been trying to debug a mystery where my scheduler is up and running but will tend to shed a (small) number of workers. No error messages, no erred tasks, and the remaining workers will complete the tasks just file. For example, I asked for 30 in my current run and finished with 16. I'd love to bring that back up to 30 adaptively.

mrocklin commented 5 years ago

cluster.adapt(minimum=30, maximum=30) ? If there is more here then it would probably be best to open a separate issue.

On Thu, Mar 28, 2019 at 2:54 PM Ben Weinstein notifications@github.com wrote:

Jumping in on this long thread. What is the status and best practice for adaptive workers on SLURM? I've been trying to debug a mystery where my scheduler is up and running but will tend to shed a (small) number of workers. No error messages, no erred tasks, and the remaining workers will complete the tasks just file. For example, I asked for 30 in my current run and finished with 16. I'd love to bring that back up to 30 adaptively.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-jobqueue/issues/122#issuecomment-477785482, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszNFRQIsvhTwHVoJCpKShq-F2siFPks5vbToYgaJpZM4V44B5 .

bw4sz commented 5 years ago

nope. That's great. Some of the above comments are very recent and I was just inquiring whether there are undocumented changes/known issues before checking this out. I'm still investigating my decaying worker list (likely due to memory spikes).

On Thu, Mar 28, 2019 at 7:08 PM Matthew Rocklin notifications@github.com wrote:

cluster.adapt(minimum=30, maximum=30) ? If there is more here then it would probably be best to open a separate issue.

On Thu, Mar 28, 2019 at 2:54 PM Ben Weinstein notifications@github.com wrote:

Jumping in on this long thread. What is the status and best practice for adaptive workers on SLURM? I've been trying to debug a mystery where my scheduler is up and running but will tend to shed a (small) number of workers. No error messages, no erred tasks, and the remaining workers will complete the tasks just file. For example, I asked for 30 in my current run and finished with 16. I'd love to bring that back up to 30 adaptively.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <https://github.com/dask/dask-jobqueue/issues/122#issuecomment-477785482 , or mute the thread < https://github.com/notifications/unsubscribe-auth/AASszNFRQIsvhTwHVoJCpKShq-F2siFPks5vbToYgaJpZM4V44B5

.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask-jobqueue/issues/122#issuecomment-477838073, or mute the thread https://github.com/notifications/unsubscribe-auth/ABJwrJ_d34c4pxtyaIs4rZIrfaY5jNfmks5vbXWngaJpZM4V44B5 .

-- Ben Weinstein, Ph.D. Postdoctoral Fellow University of Florida http://benweinstein.weebly.com/

jni commented 5 years ago

Note: once this PR is merged, it could be used by dask-jobqueue to launch workers with the appropriate --lifetime argument.

https://github.com/dask/distributed/pull/2892

which I think would resolve this issue.

lesteve commented 5 years ago

I thought the distributed PR was related to this dask-jobqueue issue but I am not sure I am following completely. Would you mind elaborating a bit more how the distributed PR would help?

jni commented 5 years ago

dask-jobqueue knows the lifetime of workers because it is the one submitting them to the job queue. So, it can now set the lifetime appropriately with the worker (with a buffer of say 2-5min) when launching it. The workers will then shut down gracefully at the given time, and no tasks will be marked as suspicious, which is what is happening now with the KilledWorker error.

jni commented 5 years ago

(To clarify:

lesteve commented 5 years ago

OK thanks! I thought part of the problem was that all the workers were killed at the same time but it seems like part of it was because of the suspicious tasks. I guess also that "all workers being killed at the same time" is the reason for the lifetime stagger parameter in the distributed PR.

@jni since you seem to have a good understanding of the problem and a way to test it, it would be great if you can try it out and let us know what we can do on the dask-jobqueue side to use this functionality once it is in a distributed release.

mrocklin commented 5 years ago

Small correction:

all tasks from that worker are marked as suspicious, even if they completed

This isn't true. Completed tasks are not marked as suspicious, just all executing and pending tasks on that worker (the scheduler doesn't know which tasks the worker has chosn to execute at any given moment, but does know once a task completes)

eXpensia commented 5 years ago

Is there any any news on the solution ? I tried the solution of @guillaumeeb, the problem is that when i specify minimum = 0 in client.adapt I never get a job request when I check the queue. When i put a higher number, i got some but when they are killed from reaching wall time they are not renewed.

lesteve commented 5 years ago

The functionality mentioned by @jni in https://github.com/dask/dask-jobqueue/issues/122#issuecomment-516325681 is in a distributed release (distributed >= 2.2.0, see [this commit](for more details)).

In principle, we could use extra parameter in your favourite Cluster object to pass additional parameters to dask-worker, so use --lifetime --lifetime-restart and --lifetime-stagger to adress the original problem.

If someone has the time to look at this, it would be great. Something that would be extremely useful, would be to have a stand-alone example to reproduce the problem.

Full disclosure : I am very unlikely to look at this in the near future, so adding the "help wanted" label.

jni commented 5 years ago

@lesteve sorry that I didn't reply to your last ping. This remains on my radar but the earliest I can work with it is mid-September. I think you're right, passing those parameters to dask-worker is the right solution.

lesteve commented 5 years ago

Thanks for letting us know and no worries I think everyone is busy with many other things.

If someone has the time to put together a stand-alone snippet reproducing the problem, it would be really appreciated! If I were to look at this I would do something like:

cluster = YourCluster(walltime='00:00:30', ...)
cluster.scale(1)
client = Client(cluster)

def func(i):
    time.sleep(5)

futures = client.map(func, range(100))

The idea is that the total time for all the tasks exceed the walltime of a single job.

eXpensia commented 5 years ago

Thanks for the answers, i'll check the extra parameters.

jni commented 4 years ago

@lesteve I'm finally ready to work on this. =) My main issue is that I don't understand how to test everything locally. Does dask-jobqueue provide a way to create a "dummy" cluster/scheduler for testing purposes? ie I'd like to test submitting jobs on my machine without having to depend on my institutional SLURM cluster. Any pointers here would be highly appreciated!

lesteve commented 4 years ago

Great to hear that!

I think an excellent first step would be to have a minimal example to reproduce the original problem on a real cluster. This can be done independently of testing things locally.

About the dask-jobqueue testing infrastructure, we do have a docker-compose setup for some schedulers (SGE, PBS, SLURM) that you can run locally (and that is used for CI).

There is a bit of doc about that in https://jobqueue.dask.org/en/latest/develop.html#test-with-job-scheduler.

https://github.com/dask/dask-jobqueue/issues/324 might have some additional useful info as well.

PR to improve the doc about how to test locally and/or how to make it more prominent are more than welcome!

jni commented 4 years ago

This should be sufficient for a reproducible example. I'm making it take 30min but it should be straightforward to reduce that... I just don't have a good sense of the overhead of launching a worker so I was reluctant to reduce it further. To reduce it:

Everything else should be the same.

import time
import numpy as np
from dask_jobqueue import SLURMCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:15:00', cores=1, memory='4gb', job_cpu=1,
                  job_mem='4gb')
cluster.adapt(minimum=0, maximum=4, target_duration='30min')
client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 30 min * 60s/min = 7200 cpu.s
filenames = [f'img{num}.jpg' for num in range(7200)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features

num_files = len(filenames)
num_features = len(features(filenames[0])[1])

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, enumerate(filenames))):
    i, v = future.result()
    X[i, :] = v

assert not np.any(np.sum(X, axis=1) == 0)  # make sure all rows were filled

Here's my yaml config, though it probably doesn't need much of this:

jobqueue:
  slurm:
    name: dask-worker

    # Dask worker options
    cores: 8                # Total number of cores per job
    memory: 36gb                # Total amount of memory per job
    processes: 1                # Number of Python processes per job

    #interface: null             # Network interface to use like eth0 or ib0
    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: $TMPDIR       # Location of fast local storage like /scratch or $TMPDIR

    # SLURM resource manager options
    #queue: null
    project: su62
    walltime: '01:00:00'
    #extra: []
    #env-extra: []
    job-cpu: 8
    job-mem: 36gb
    #job-extra: {}
    log-directory: /home/jnun0003/dask-logs

To work around the issue with current dask-jobqueue, you do:

work_queue = as_completed(client.map(features, enumerate(filenames)))
for future in work_queue:
    try:
        i, v = future.result()
    except KilledWorker:
        client.retry([future])
        work_queue.add(future)
    else:
        X[i, :] = v

in the for-loop.

To fix the issue with the latest dask-distributed, when a dask-worker job gets submitted to the cluster, we should add --lifetime=<value> where value is the walltime of the cluster worker, minus some small margin, e.g. 1min. This will make the dask scheduler not treat the death of that worker as suspicious. I think .adapt will take care of launching a new worker. It's unclear to me whether --lifetime-stagger is something we should worry about at this point.

Please let me know if the script above is sufficient to reproduce the issue for you!

lesteve commented 4 years ago

Great thanks a lot for your input!

This is the snippet I am trying to run on my cluster SGE. I just launched it will tell you whether this reproduces or not.

There were are a few fixes I had to do (lines are indicated by # FIX in the snippet below):

import time
import numpy as np
from dask_jobqueue import SGECluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:15:00', cores=1, memory='4gb',
    queue='all.q',
    resource_spec='h_vmem=10G,mem_req=4G') #, job_cpu=1, job_mem='4gb')

cluster.adapt(minimum_jobs=0, maximum_jobs=4, target_duration='30minutes') # FIX
client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 30 min * 60s/min = 7200 cpu.s
filenames = [f'img{num}.jpg' for num in range(7200)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
    i, v = future.result()
    X[i, :] = v

assert not np.any(np.sum(X, axis=1) == 0)  # make sure all rows were filled

Error:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
~/work/test-dask-jobqueue.py in <module>
     30 
     31 for future in as_completed(client.map(features, list(enumerate(filenames)))):
---> 32     i, v = future.result()
     33     X[i, :] = v
     34 

~/miniconda3/envs/dask/lib/python3.7/site-packages/distributed/client.py in result(self, timeout)
    219         if self.status == "error":
    220             typ, exc, tb = result
--> 221             raise exc.with_traceback(tb)
    222         elif self.status == "cancelled":
    223             raise result

KilledWorker: ('features-9926151cd21fae08dd7f1e8344cadf85', <Worker 'tcp://10.141.0.18:35898', memory: 0, processing: 3617>)
lesteve commented 4 years ago

I can reproduce the problem indeed, which is a great start. I edited my earlier message with the error.

guillaumeeb commented 4 years ago

Thanks to @willsALMANJ issue a few days ago, I tried the --lifetime option and I confirm that it works perfectly with the latest Dask, Distributed and Jobqueue versions.

The initial script I used (just reduced time):

import time
import numpy as np
from dask_jobqueue import PBSCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb')
cluster.adapt(minimum=0, maximum=4) # FIX

client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(480)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
    i, v = future.result()
    X[i, :] = v

It fails with a KilleWorkerException when the first 4 workers are killed due to walltime.

Just modify the cluster initialization:

cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb', extra=["--lifetime", "50s"])

And it works! I think it solves the problem here.

It's unclear to me whether --lifetime-stagger is something we should worry about at this point

I think this would be valuable when scaling up with 100s of workers, at that point you don't want them all to stop at the same time.

I'll try to produce some documentation to explain all that and close this issue. The outline should look something like:

How to handle Job Queueing system walltime killing workers

Reaching walltime can be troublesome

If you don't set the proper parameters, you'll run into KilleWorker exceptions in thos two cases. Use --lifetime worker option. This will enables infinite workloads using adaptive.

Use --lifetime-stagger when dealing with many workers.

Examples

cluster = Cluster(walltime='01:00:00', cores=4, memory='16gb', extra=["--lifetime", "55m", "--lifetime-stagger", "4m"])
cluster.adapt(minimum=0, maximum=200)

...
MatthewLennie commented 4 years ago

@guillaumeeb Is this still current? It seems like there is an additional flag in the Worker CLI to trigger life time restarts --lifetime-restart. Even with the additional flag this doesn't seem to work for me.

guillaumeeb commented 4 years ago

Is this still current?

I just tested with dask and distributed 2.22.0 and jobqueue 0.7.1, and it works.

It seems like there is an additional flag in the Worker CLI to trigger life time restarts --lifetime-restart. Even with the additional flag this doesn't seem to work for me

You don't want to use this flag in this case, or your jobs will never terminate normally, they will be killed by the job scheduler due to walltime. What you want is your jobs ending before the walltime limit, so only --lifetime.

Did you tested this with the code above, or your own code?

riedel commented 3 years ago

What is the correct way to evict a job from the schedulers point of view? Is SIGTERM enough for a worker to correctly terminate and transfer its memory, after https://github.com/dask/distributed/pull/2844 gets merged?

Does gracefull shutdown mean, that memory gets moved?

I agree that workers ideally terminate one after another, there should be a way to spin up replacements well in time or choose staggered walltimes, since my problem was that all workers had approximately the same wall time and there would have been no way to gracefully handle the situation. What is the current policy here and what would be a sensible policy? It does not make sense to transfer memory and tasks to the worker that dies next I guess. What would be the idea how to handle it?

It seems to me that adapt could have the tendency to kill the youngest jobs first, since they do not carry too much data. Will specifying lifetime take care of this? Or is this implicitely handled by adapt, because an empty new worker is spun up, that is an ideal target?

guillaumeeb commented 3 years ago

@riedel sorry for the delay...

Just submitted a merge request to clarify things here and close this issue.

What is the correct way to evict a job from the schedulers point of view?

From Dask-jobqueue perspective, just use --lifetime option as explained above. This will trigger a gracefull shutdown.

What is the current policy here and what would be a sensible policy?

Use --lifetime-stagger option.

It does not make sense to transfer memory and tasks to the worker that dies next I guess. What would be the idea how to handle it?

Hmm, I guess there is no correct answer to this right now. The solution here probably won't work well on worker with a heavy memory load.

Will specifying lifetime take care of this?

I don't think that. The first started worker will be killed first (with some uncertainties due to --lifetime-stagger). But here we've got no choice, this is a job queueing system walltime problem, these workers would be killed anyway.

Or is this implicitely handled by adapt, because an empty new worker is spun up, that is an ideal target?

Not handled by adapt, but I guess the Scheduler has some logics on reafecting tasks and memory, and it will probably aim the youngest and free workers for this.