cabouman / mbircone

BSD 3-Clause "New" or "Revised" License
11 stars 9 forks source link

Dask issue: job cancelled #69

Open dyang37 opened 2 years ago

dyang37 commented 2 years ago

Recently I've been running into the issue of nodes get canceled when performing multi-node computation with dask. Here's the error log from a canceled node:

2022-05-11 13:46:54,342 - distributed.nanny - INFO -         Start Nanny at: 'tcp://172.18.32.195:46094'
2022-05-11 13:46:54,973 - distributed.diskutils - INFO - Found stale lock file and directory '/scratch/brown/yang1467/dask-worker-space/worker-ji_0ekoe', purging
2022-05-11 13:46:54,978 - distributed.diskutils - INFO - Found stale lock file and directory '/scratch/brown/yang1467/dask-worker-space/worker-ioqwzjv6', purging
2022-05-11 13:46:54,982 - distributed.diskutils - INFO - Found stale lock file and directory '/scratch/brown/yang1467/dask-worker-space/worker-tfitaup9', purging
2022-05-11 13:46:54,987 - distributed.diskutils - INFO - Found stale lock file and directory '/scratch/brown/yang1467/dask-worker-space/worker-_1gtjqme', purging
2022-05-11 13:46:55,033 - distributed.worker - INFO -       Start worker at:  tcp://172.18.32.195:44174
2022-05-11 13:46:55,033 - distributed.worker - INFO -          Listening to:  tcp://172.18.32.195:44174
2022-05-11 13:46:55,033 - distributed.worker - INFO -          dashboard at:        172.18.32.195:43611
2022-05-11 13:46:55,033 - distributed.worker - INFO - Waiting to connect to:   tcp://172.18.32.86:35988
2022-05-11 13:46:55,033 - distributed.worker - INFO - -------------------------------------------------
2022-05-11 13:46:55,034 - distributed.worker - INFO -               Threads:                          1
2022-05-11 13:46:55,034 - distributed.worker - INFO -                Memory:                  59.60 GiB
2022-05-11 13:46:55,034 - distributed.worker - INFO -       Local Directory: /scratch/brown/yang1467/dask-worker-space/worker-l1vcmet_
2022-05-11 13:46:55,034 - distributed.worker - INFO - -------------------------------------------------
2022-05-11 13:46:55,054 - distributed.worker - INFO -         Registered to:   tcp://172.18.32.86:35988
2022-05-11 13:46:55,054 - distributed.worker - INFO - -------------------------------------------------
2022-05-11 13:46:55,054 - distributed.core - INFO - Starting established connection
slurmstepd: error: *** JOB 15619420 ON brown-a145 CANCELLED AT 2022-05-11T13:55:09 ***

I can confirm that this is not due to small death_timeout duration, as I set death_timeout to be 1200 sec, while the node cancelation happens rather early (~5mins after I got the nodes).

Furthermore, I observed that a large chunk of the multi-node jobs gets canceled:

<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-11>
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-7>
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-3>
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-8>
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-4>
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-0>
<Future: finished, type: dict, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-2>
{
index: 2
host: brown-a390.rcac.purdue.edu
pid: 33870
time: 14:26:02
}
<Future: finished, type: dict, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-1>
{
index: 1
host: brown-a375.rcac.purdue.edu
pid: 57443
time: 14:26:41
}
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-9>
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-5>
<Future: finished, type: dict, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-6>
{
index: 6
host: brown-a390.rcac.purdue.edu
pid: 33870
time: 14:37:04
}
<Future: cancelled, key: parallel_func-497c8a35-73fe-455c-80f5-5fbe7a1d1f05-10>
[0, 3, 4, 5, 7, 8, 9, 10, 11]
dyang37 commented 2 years ago

Some further debugging shows that the worker died because the scheduler fails to hear from a worker for a certain period of time (300 sec). According to this source (https://github.com/dask/distributed/issues/6324), this occurs because the distributed tasks hold the GIL for long periods of time. Normally pure python code is unlikely hold GIL for too long, but since our code involves Cython, and therefore this created the problem.

I started seeing jobs repeatedly getting canceled after we upgraded the dask version. This also agrees to the observations from the link above, which states that this behavior is observed in dask.distributed==2022.5.0.

You can see the PR of dask source code that caused our issue here: https://github.com/dask/distributed/pull/6200

A quick fix for now is simply to downgrade dask version.

A more elegant fix could be either 1. Figure out a way for Cython to release GIL, or 2. Increase default timeout interval. Currently it is 300 sec.

Unfortunately there does not seem to be an easy fix. Both solutions requires some work around.

Here are the error logs:

2022-07-27 00:26:50,524 - distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <WorkerState 'tcp://172.18.33.46:32965', name: SLURMCluster-4, status: running, memory: 12, processing: 3>
2022-07-27 00:26:50,525 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.18.33.46:32965', name: SLURMCluster-4, status: running, memory: 12, processing: 3>
2022-07-27 00:26:50,525 - distributed.core - INFO - Removing comms to tcp://172.18.33.46:32965
dyang37 commented 2 years ago

Regarding fix method 2, increasing default timeout interval, here's how to do it: In the jobqueue config file (usually ~/.config/dask/jobqueue.yaml), add worker-ttl: null under scheduler. Now the scheduler no longer kills processes!!

This is basically reverting Dask's update here: https://github.com/dask/distributed/pull/6200/commits/d94ab9a88ff9076915ff1e744cc735ee380d31ce

cabouman commented 2 years ago

Diyu, I don't totally understand this, but can you make some change, perhaps to the timeout interval, that fixes the problem and then do a PR?

dyang37 commented 2 years ago

Diyu, I don't totally understand this, but can you make some change, perhaps to the timeout interval, that fixes the problem and then do a PR?

Sure. Basically what happened is that the scheduler does not hear back from the computation node for a long time (300 sec), and the schedule thinks the node is dead, and cancels the node, while in fact the node is still actively performing computation. The fix is to set this timeout period to be infinity instead of 300 sec (default value of dask).

This fix requires the user to manually change the timeout interval value inside the Dask config file (which usually stores in the home directory ~/.config/dask/jobqueue.yaml). Perhaps we can add the modified config file into mbircone code base (for example, create a new directory mbircone/configs/dask/, and put the config file in this directory)? Meanwhile I'll see whether there's a way to directly pass timeout as an argument to our function multinode.get_cluster_ticket(). If so then we can simply set this argument for the user, and the user do not need to manually change the config file.

Will do a PR on this.