dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
234 stars 143 forks source link

Dask workers are created, htcondor jobs allocated but workers only slowly connected to the dask cluster/scheduler #479

Closed simone-codeluppi closed 3 years ago

simone-codeluppi commented 3 years ago

Hi When a start a cluster using dask-jobqueue:

    from dask_jobqueue.htcondor import HTCondorCluster

    cluster = HTCondorCluster(cores=1, memory='10GB', disk='1GB')
    cluster.scale(jobs=100)

What happened: The 100 jobs on condor start almost instantaneously (only my work is running on the cluster) When I look at the dask cluster size it takes a long time (1hr or more) before it reaches 100 workers or sometimes doesn't reach the max at all and the number of condor jobs is scale down to the number of workers reported by the clustercommand:

the condor_q output is:

-- Schedd: monod.mbb.ki.se : <193.10.16.58:9618?... @ 12/10/20 09:52:22
    OWNER  BATCH_NAME      SUBMITTED   DONE   RUN    IDLE  TOTAL JOB_IDS
    simone CMD: /bin/sh  12/10 09:41      _    100      _    100 1629200.0 ... 1629
    100 jobs; 0 completed, 0 removed, 0 idle, 100 running, 0 held, 0 suspended

Example After 15min the print out of clusteris

HTCondorCluster('tcp://193.10.16.58:31878', workers=49, threads=49, memory=490.00 GB)

and the print out of cluster.workers generate a dict of all 100 workers

    {1: <HTCondorJob: status=Status.running>,
    ------------------------------------------
    102: <HTCondorJob: status=Status.running>}

After 60 min the print out of clusteris

HTCondorCluster('tcp://193.10.16.58:31878', workers=100, threads=100, memory=490.00 GB)

I became aware of the issue because in my script I start the cluster, scale it and map a function. The function get mapped only to the number of workers displayed in the output of the clustercommand (ex. 49). However, the process crash because the scheduler cannot communicate with the remaining workers that are running (ex. 51).

If I used adaptive scaling the addition of workers to the cluster is soo slow that the extra worker added are removed.

Am I missing something? Thanks for the help.

# Name                    Version                   Build  Channel
dask                         2.30.0                     py_0    conda-forge
dask-core                2.30.0                     py_0    conda-forge
dask-jobqueue        0.7.2                       pypi_0    pypi
python                      3.8.6                      hffdb5ce_1_cpython    conda-forge
OS                             Linux
$CondorVersion: 8.6.13 Oct 30 2018 BuildID: 453497 $
$CondorPlatform: x86_64_RedHat7
mivade commented 3 years ago

I just tried this out with my cluster using dask-jobqueue 0.7.1 and couldn't reproduce the issue. It does take a little while for the jobs to spin up and workers to connect, but in my case it looks to take less than a minute. I also installed 0.7.2 to confirm that it works as expected. The only other key difference is that I have a newer version of Condor:

$CondorVersion: 8.8.11 Oct 21 2020 BuildID: 521488 PackageID: 8.8.11-1 $
$CondorPlatform: x86_64_CentOS7 $

Anecdotally, I have sometimes seen issues where workers start up but can't communicate. I haven't found a common thread but usually trying again makes everything work (it doesn't sound like that approach will work for you here). Sorry I can't be more helpful :(

simone-codeluppi commented 3 years ago

Thanks for the effort! Really appreciated. I will check if an upgrade of condor is an option for our cluster.

guillaumeeb commented 3 years ago

I guess we'll need some output logs of the workers that are taking a long time to spun up. See https://jobqueue.dask.org/en/latest/debug.html#interact-with-your-job-queuing-system.

From my experience with HPC cluster, if a worker is slow to start, it's because the underlying file system which host the Python install is overwhelmed. But I've never seen start taking longer than a minute or so.

I've also encounter workers failing to start because of network timeout when connecting to scheduler, when using 100s of workers. But in this case, these workers never recover.

I became aware of the issue because in my script I start the cluster, scale it and map a function. The function get mapped only to the number of workers displayed in the output of the clustercommand (ex. 49). However, the process crash because the scheduler cannot communicate with the remaining workers that are running (ex. 51).

I think this is the most important part of your problem, or the one in which we should investigate: what makes the process crash? Failing workers?

simone-codeluppi commented 3 years ago

I looked at many session of logs but nothing really stand out or error reported. I still think that the process crush because of communication issues between workers and schedules but still no idea if it is a specific issue of our setup. I have being trying to figure this out but nothing so far.

guillaumeeb commented 3 years ago

One thing you could try is to increase the TCP communication timeouts in distributed.yaml:

timeouts:
  connect: 120s          # time before connecting fails
  tcp: 120s              # time before calling an unresponsive connection dead