Closed berkgercek closed 1 year ago
There's a long issue discussion on this at https://github.com/dask/dask-jobqueue/issues/122 (which hopefully includes a solution for you!)
Ah, that is now in the docs at https://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers
@berkgercek, hopefully the links provided by @ocaisa should give you at least some workaround.
Other than that, I agree that in a simple case, with adaptive mode, new Workers should be started if some are lost. We should look at how this is handled in distributed repository.
Just a note you should be able to get the scheduler logs with cluster.get_logs()
.
It seems that that it should be possible to make the respawning workaround even with just cluster.scale(n)
by calling:
current = len(self.plan)
cluster.scale(jobs=len(cluster.scheduler.workers))
cluster.scale(current)
However, in the following code responsible for scaling: https://github.com/dask/distributed/blob/a83d8727567dd3cdc7c6abdc7eda26d1029cd9de/distributed/deploy/spec.py#L512-L524
there is a mismatch between worker names (when using processes > 1
):
set(self.worker_spec)
has keys without a suffix: {'SLURMCluster-611', 'SLURMCluster-631'}
v["name"] for v in self.scheduler_info["workers"].values()
has a suffix e.g., {'SLURMCluster-592-1', 'SLURMCluster-592-0'}
This mismatch seems to be also responsible for making adapt
remove alive workers instead of the dead (not_yet_connected
above).
I'm running into exactly the same issue. The code that I'm using to call SLURMCluster
can be found here. It already includes the workaround mentioned above.
Here is the log output from a recent run with walltime='00:20:00'
and '--lifetime', '15m'
(otherwise same settings as code that I linked above). I removed the first lines, which is just informing about the startup of the workers.
2023-11-09 12:34:54,303 - distributed.core - INFO - Starting established connection to tcp://172.18.10.2:41919
2023-11-09 12:46:30,084 - distributed.worker - INFO - Closing worker gracefully: tcp://172.18.1.15:42229. Reason: worker-lifetime-reached
2023-11-09 12:46:30,091 - distributed.worker - INFO - Stopping worker at tcp://172.18.1.15:42229. Reason: worker-lifetime-reached
2023-11-09 12:46:30,097 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://172.18.1.15:39303'. Reason: worker-lifetime-reached
2023-11-09 12:46:30,102 - distributed.core - INFO - Connection to tcp://172.18.10.2:41919 has been closed.
2023-11-09 12:46:30,105 - distributed.nanny - INFO - Worker closed
2023-11-09 12:46:32,110 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-11-09 12:46:32,314 - distributed.nanny - INFO - Closing Nanny at 'tcp://172.18.1.15:39303'. Reason: nanny-close-gracefully
2023-11-09 12:46:47,153 - distributed.worker - INFO - Closing worker gracefully: tcp://172.18.1.15:40559. Reason: worker-lifetime-reached
2023-11-09 12:46:47,163 - distributed.worker - INFO - Stopping worker at tcp://172.18.1.15:40559. Reason: worker-lifetime-reached
2023-11-09 12:46:47,166 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://172.18.1.15:39757'. Reason: worker-lifetime-reached
2023-11-09 12:46:47,170 - distributed.core - INFO - Connection to tcp://172.18.10.2:41919 has been closed.
2023-11-09 12:46:47,174 - distributed.nanny - INFO - Worker closed
2023-11-09 12:46:49,178 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-11-09 12:46:49,374 - distributed.nanny - INFO - Closing Nanny at 'tcp://172.18.1.15:39757'. Reason: nanny-close-gracefully
2023-11-09 12:49:50,151 - distributed.worker - INFO - Closing worker gracefully: tcp://172.18.1.15:35215. Reason: worker-lifetime-reached
2023-11-09 12:49:50,157 - distributed.worker - INFO - Stopping worker at tcp://172.18.1.15:35215. Reason: worker-lifetime-reached
2023-11-09 12:49:50,161 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://172.18.1.15:37005'. Reason: worker-lifetime-reached
2023-11-09 12:49:50,166 - distributed.core - INFO - Connection to tcp://172.18.10.2:41919 has been closed.
2023-11-09 12:49:50,169 - distributed.nanny - INFO - Worker closed
2023-11-09 12:49:52,173 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-11-09 12:49:52,355 - distributed.nanny - INFO - Closing Nanny at 'tcp://172.18.1.15:37005'. Reason: nanny-close-gracefully
2023-11-09 12:50:15,984 - distributed.worker - INFO - Closing worker gracefully: tcp://172.18.1.15:41529. Reason: worker-lifetime-reached
2023-11-09 12:50:15,990 - distributed.worker - INFO - Stopping worker at tcp://172.18.1.15:41529. Reason: worker-lifetime-reached
2023-11-09 12:50:15,994 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://172.18.1.15:37845'. Reason: worker-lifetime-reached
2023-11-09 12:50:15,997 - distributed.core - INFO - Connection to tcp://172.18.10.2:41919 has been closed.
2023-11-09 12:50:16,000 - distributed.nanny - INFO - Worker closed
2023-11-09 12:50:18,004 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-11-09 12:50:18,169 - distributed.nanny - INFO - Closing Nanny at 'tcp://172.18.1.15:37845'. Reason: nanny-close-gracefully
2023-11-09 12:50:58,702 - distributed.worker - INFO - Closing worker gracefully: tcp://172.18.1.15:40349. Reason: worker-lifetime-reached
2023-11-09 12:50:58,708 - distributed.worker - INFO - Stopping worker at tcp://172.18.1.15:40349. Reason: worker-lifetime-reached
2023-11-09 12:50:58,712 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://172.18.1.15:34527'. Reason: worker-lifetime-reached
2023-11-09 12:50:58,716 - distributed.core - INFO - Connection to tcp://172.18.10.2:41919 has been closed.
2023-11-09 12:50:58,720 - distributed.nanny - INFO - Worker closed
2023-11-09 12:51:00,724 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-11-09 12:51:00,948 - distributed.nanny - INFO - Closing Nanny at 'tcp://172.18.1.15:34527'. Reason: nanny-close-gracefully
2023-11-09 12:51:00,950 - distributed.dask_worker - INFO - End worker
If I then continue my work and end up calling .compute()
somewhere, a new slurm job + dask workers are started. So at least I (or my students) don't end up accidentally processing on the cluster's login node...
Spawning of new workers fails with:
cluster.adapt(minimum_jobs=1,
maximum_jobs=2,
worker_key=lambda state: state.address.split(':')[0],
interval='10s')
It works however when using the following:
cluster.adapt(minimum=1,
maximum=8,
worker_key=lambda state: state.address.split(':')[0],
interval='10s')
In my case each job spawns 4 workers so maximum=8
is equal to maximum_jobs=2
.
Removing worker_key
and interval
will result in the endless loop of spawning and killing workers as described in https://github.com/dask/dask-jobqueue/issues/498
@matrach you are right about the mismatch in the distributed code in the specific case where we want to scale down not yet launched workers. However I'm not sure how this relates to this problem were we want to respawn dead workers?
@maawoo the link to your code is dead for me. Considering the second part, cluster.adapt(minimum_jobs=1, maximum_jobs=2) will be translated in cluster.adapt(minimum=4, maximum=8), which probably causes the issue.
It's important to stress that adaptive mode is known to have issues with dask-jobqueue when starting several Worker processes per job.
Getting back at the original problem, I just tested the following code using dask 2023.6.0:
import time
import numpy as np
from dask_jobqueue import SLURMCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed
cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb', account="campus")
cluster.adapt(minimum=2, maximum=4) # FIX
client = Client(cluster)
And I see new workers being created as soon as older ones dies, without performing any computations.
I'm going to close this issue as the more specific problems are covered by other ones.
@matrach you are right about the mismatch in the distributed code in the specific case where we want to scale down not yet launched workers. However I'm not sure how this relates to this problem were we want to respawn dead workers?
I've never mentioned such a case. The issue was (is?) that the variable name not_yet_connected
doesn't contain what it states to: with this naming mismatch it always contained all of the workers. Even without the mismatch, it would contain both "not yet connected" and "already dead" workers.
not_yet_launched = set(self.worker_spec) - {
v["name"] for v in self.scheduler_info["workers"].values()
}
while len(self.worker_spec) > n and not_yet_launched:
del self.worker_spec[not_yet_launched.pop()]
But, set.pop()
is allowed to return arbitrary element. Thus an implementation starting from the newest entries might always start from "not yet connected" instead of "already dead" workers.
I've never mentioned such a case
What I meant was that it is another issue, or is it not?
This is related when using adapt
, because the code above, instead of removing dead workers, may kill the newly spawned ones. This may lead to thrashing.
When spawning a SLURM cluster on dask-jobqueue, the cluster spawns workers as expected when
cluster.adapt(minimum_jobs=6, maximum_jobs=100)
is called. These workers continue to function as expected until they time out, however when the workers die (due to walltime limits on the job associated) the dask cluster does not spawn new workers to replace them.For me this is a case of unknown unknowns. I don't know where to look for the dask scheduler logs which would perhaps explain why the issue is happening. The worker logs are fine, and simply show that they were killed due to the cluster timeout.
Environment: