dask / dask-mpi

Deploy Dask using MPI4Py
BSD 3-Clause "New" or "Revised" License
52 stars 29 forks source link

dask-mpi not using timeout values set via config.yaml or dask environment variables #82

Closed lastephey closed 2 years ago

lastephey commented 3 years ago

What happened:

I was attempting to start a dask-mpi cluster on 20 (admittedly slow) 68-core Intel KNL nodes (1320 workers, each with a single thread). I observed the scheduler start and the workers attempt to start and connect, but eventually fail with messages like

distributed.comm.tcp - INFO - Connection from tcp://10.128.9.135:41230 closed before handshake completed

Knowing that KNL has a slow clock speed of 1.4 GHz, I attempted to increase the timeout by setting values in my ~/.config/dask/config.yaml file as recommended in the Dask docs. I also attempted to set environment variables via export DASK_DISTRIBUTED__COMM_TIMEOUTS_CONNECT=240s.

I tried a very extreme case where I set

export DASK_DISTRIBUTED__COMM_TIMEOUTS_CONNECT=1000s
export DASK_DISTRIBUTED__COMM_TIMEOUTS_TCP=1000s 

but I still saw timeout failures within a minute or two while dask-mpi attempted to start my cluster, so based on that it seems like dask-mpi is not respecting these values.

What you expected to happen:

I would like/expect dask-mpi to use the configuration options advertised in the dask docs like ~/.config/dask/config.yaml. It's not clear to me if it does or should. Whatever the outcome of this issue is, it would be helpful to add a note to the docs about whether dask-mpi does support these configuration options.

Minimal Complete Verifiable Example:

I launched my dask-mpi cluster on NERSC's Cori system with the following commands inside a custom conda enviornment:

salloc --nodes=20 --ntasks=1360 --cpus-per-task=1 --time=240  --constraint=knl --qos=interactive
export OMP_NUM_THREADS=1
export PYTHONUNBUFFERED=1
export DASK_DISTRIBUTED__COMM_TIMEOUTS_CONNECT=1000s
export DASK_DISTRIBUTED__COMM_TIMEOUTS_TCP=1000s 
srun -u dask-mpi --scheduler-file=scheduler.json --dashboard-address=0 --memory-limit="1.2 GiB" --nthreads=1 --no-nanny --local-directory=/tmp

After 1-2 minutes, I saw many timeout messages:

distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.121:44235'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.113:41129'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.126:37758'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.123:45557'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.116:58355'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.129:35394'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.126:58587'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.9.113:57881'

Anything else we need to know?:

Environment:

conda list | grep "dask"
dask                      2021.10.0          pyhd3eb1b0_0  
dask-core                 2021.10.0          pyhd3eb1b0_0  
dask-mpi                  2.21.0           py38h4ecba47_2    conda-forge
python --version
Python 3.8.8
cat /etc/os-release
NAME="SLES"
VERSION="15"
VERSION_ID="15"
PRETTY_NAME="SUSE Linux Enterprise Server 15"
ID="sles"
ID_LIKE="suse"
ANSI_COLOR="0;32"
CPE_NAME="cpe:/o:suse:sles:15"

Thank you very much, Laurie

kmpaul commented 3 years ago

@lastephey: Thanks for the contribution! I do not have an account (any longer) on NERSC systems, nor any hours to spend on the NERSC machines to debug). There may be a way of reproducing this error on a smaller system, but I'm not sure what that might look like.

lastephey commented 3 years ago

Hi @kmpaul,

If you'd like I am happy to get you a NERSC account if you'd like to do some testing, so please let me know.

Can you clarify whether dask-mpi should be using the Dask configuration options? Am I specifying them correctly?

Thank you, Laurie

kmpaul commented 3 years ago

Thanks, @lastephey! I might take you up on that, but let's see if we can drill down a bit before taking that measure.

I'm not aware of anything in dask-mpi that prevents the default configuration from working as it normally should. I will need to look further into this to see if there is something else happening.

kmpaul commented 3 years ago

@lastephey: Before going too far down the road, can you test out the latest version of dask-mpi? We updated it after fixing some bugs just a few days ago.

kmpaul commented 3 years ago

Latest version should be 2021.11.0.

lastephey commented 3 years ago

Sure, I'll see if I can reproduce with 2021.11.0 and report back.

lastephey commented 3 years ago

Hi @kmpaul, sorry for the delay. I re-tried this with

stephey@nid02338:~> conda list | grep "dask"
dask                      2021.10.0          pyhd3eb1b0_0  
dask-core                 2021.10.0          pyhd3eb1b0_0  
dask-mpi                  2021.11.0        py38h4ecba47_0    conda-forge

and ran

salloc --nodes=20 --ntasks=1360 --cpus-per-task=1 --time=240  --constraint=knl --qos=interactive
export OMP_NUM_THREADS=1
export PYTHONUNBUFFERED=1
export DASK_DISTRIBUTED__COMM_TIMEOUTS_CONNECT=1000s
export DASK_DISTRIBUTED__COMM_TIMEOUTS_TCP=1000s 
srun -u dask-mpi --scheduler-file=scheduler.json --dashboard-address=0 --memory-limit="1.2 GiB" --nthreads=1 --local-directory=/tmp

I still saw failures during the cluster startup with OSError: Timed out during handshake while connecting to tcp://10.128.9.53:43779 after 60 s.

I tried to verify that the environment variable settings I used to attempt to increase the timeout were being set by

stephey@nid02338:~> python
Python 3.8.8 (default, Feb 24 2021, 21:46:12) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import dask.distributed
>>> dask.config.config
{'version': 1, 'distributed': {'comm_timeouts_connect': '1000s', 'comm_timeouts_tcp': '1000s',

so at least Dask is ingesting them, although I don't know if these are actually the correct settings and/or I can't tell if they're being used.

I think 1300 workers may be hard/impossible on Cori since our network interconnect (Aries) just doesn't handle TCP very well. It would still be nice to be able to increase this timeout though if possible to try to accommodate bigger pools of workers.

jacobtomlinson commented 3 years ago

Are you able to connect a client to the running cluster? If so you could run dask.config.config on the workers to make sure the config made its way there.

>>> from dask.distributed import Client
>>> client = Client(scheduler_file=scheduler.json)
>>> print(client.run(lambda: dask.config.config))
lastephey commented 2 years ago

Hi everyone,

I'm sorry for my very slow response here.

Using @jacobtomlinson's suggestion, I confirm that the settings are making their way to the dask-worker configuration using 2021.11.0. In this test I used a much smaller, 2 node cluster with 63 workers.

print(client.run(lambda: dask.config.config))
{'tcp://10.128.0.111:33119': {'version': 1, 'distributed': {'comm_timeouts_connect': '1000s', 'comm_timeouts_tcp': '1000s', 'version': 2, 'scheduler': ...

So that's good.

I did try again to start the original cluster with

salloc --nodes=20 --ntasks=1360 --cpus-per-task=1 --time=240  --constraint=knl --qos=interactive

But still saw quite a few

distributed.comm.tcp - INFO - Connection from tcp://10.128.9.71:41142 closed before handshake completed

about 1-2 minutes after I launched the cluster, so it still doesn't seem like it's obeying my 1000s timeout request.

Thank you for your help, Laurie

jacobtomlinson commented 2 years ago

Ah I see the reason the timeout is being ignored. The env vars should be

export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=1000s
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=1000s 

Note the double underscores between each word. There is a utility on the docs page that helps generate env vars and you can check it will create the right config nesting. It can be helpful because the Dask config system can be a little finicky to work out.

lastephey commented 2 years ago

Ah, good catch @jacobtomlinson! And thanks for the pointer to the utility in the dask docs.

I'll test and report back.

lastephey commented 2 years ago

Well, this sounded promising but unfortunately I still don't think it's working. I tried to start up my 1300 worker test cluster but still saw timeouts after 1-2 minutes, so I went back to a 2 node cluster for testing.

I started my 2 node test cluster up via:

module load python
source activate dask-mpi-test
export OMP_NUM_THREADS=1
export PYTHONUNBUFFERED=1
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=1000s
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=1000s
salloc --nodes=2 --ntasks=64--cpus-per-task=1 --time=240  --constraint=knl --qos=interactive
srun -u dask-mpi --scheduler-file=scheduler.json --dashboard-address=0 --memory-limit="1.2 GiB" --nthreads=1 --local-directory=/tmp --worker-class distributed.Worker

making sure to use the correct double underscore variables.

I used @jacobtomlinson's trick to see if the config had propagated to the workers:

settings = client.run(lambda: dask.config.config)
settings
{'tcp://10.128.9.53:34089': {'version': 1,
  'distributed': {'comm': {'timeouts': {'tcp': '60s', 'connect': '60s'},
    'retry': {'count': 0, 'delay': {'min': '1s', 'max': '20s'}},
    'compression': 'auto',
    'shard': '64MiB',
    'offload': '10MiB',

I can see that these appear at the top of the dict, which seems to me Dask is aware that I have tried to set these values. They are still at their default values here, though, so I am confused.

Thanks again for your help, Laurie

jacobtomlinson commented 2 years ago

I'm not sure. It seems the config isn't being propagated to the workers correctly.

I don't know enough about SLURM but would you expect all local environment variables to be propagated to the tasks in the cluster?

I know in other Dask projects we manually bundle up the local config and pass it along to the worker processes. But I don't think dask-mpi does this.

This seems like a good thing to fix, but I also wonder if there is a way we can work around this for you in the mean time. Do you know what the filesystem on the remote nodes looks like? Does it have your home directory? If so you could drop a YAML file into ~/.config/dask/distributed.yaml instead of trying to configure via environment variables.

Alternatively you could use the Python API instead of the CLI to launch your cluster and set your config within the script.

lastephey commented 2 years ago

Thanks @jacobtomlinson. As far as I know slurm copies all environment variable settings to our compute nodes. Here is a tiny demo:

stephey@cori09:~> export HELLO=HELLO                 
stephey@cori09:~> echo $HELLO
HELLO
stephey@cori09:~> salloc -N 2 -t 10 -C knl -q interactive
salloc: Pending job allocation 52039969
salloc: job 52039969 queued and waiting for resources
salloc: job 52039969 has been allocated resources
salloc: Granted job allocation 52039969
salloc: Waiting for resource configuration
salloc: Nodes nid0[2349-2350] are ready for job
stephey@nid02349:~> echo $HELLO
HELLO

so I think those settings should be available to the workers. I did try to test your other suggestions, too.

We do have a shared filesystem that is mounted on all compute nodes, so I put the following into my ~/.config/dask/distributed.yaml using the helpful utility you pointed me to:

stephey@cori09:~> cat ~/.config/dask/distributed.yaml
distributed:
  comm:
    timeouts:
      connect: 1000s
      tcp: 1000s
stephey@cori09:~> 

I used this setup to start a 2 node cluster, but when I checked, I didn't see the settings present in the worker config. They were still the default values:

{'tcp://127.0.0.1:34701': {'distributed': {'comm': {'timeouts': {'connect': '60s',
     'tcp': '60s'},

Finally I tried the API, although I haven't used it before and I'm not sure I'm doing it right. I started this script via srun -n 4 python test-dask-api.py

from dask_mpi import initialize
import dask
from dask.distributed import Client
import json

initialize(local_directory='/tmp',worker_class='distributed.Worker')

client = Client()

settings = client.run(lambda: dask.config.config)

with open('settings.json', 'w') as f:
    json.dump(settings, f)

but I don't see any output in settings.json. I'm not even sure the cluster is starting correctly, given the distributed.scheduler - INFO - Scheduler closing all comms message.

(/global/common/software/das/stephey/dask-mpi-test) stephey@nid02349:/global/cscratch1/sd/stephey> srun -n 4 python test-dask-api.py 
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.128.9.64:40789
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.scheduler - INFO - Receive client connection: Client-b559edc8-5e3d-11ec-94b7-000101000c2d
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Run out-of-band function 'stop'
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.worker - INFO -       Start worker at:    tcp://10.128.9.65:38017
distributed.worker - INFO -       Start worker at:    tcp://10.128.9.65:39415
distributed.worker - INFO -          Listening to:    tcp://10.128.9.65:38017
distributed.worker - INFO -          Listening to:    tcp://10.128.9.65:39415
distributed.worker - INFO -          dashboard at:          10.128.9.65:33295
distributed.worker - INFO -          dashboard at:          10.128.9.65:45041
distributed.worker - INFO - Waiting to connect to:    tcp://10.128.9.64:40789
distributed.worker - INFO - Waiting to connect to:    tcp://10.128.9.64:40789
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                 327.53 MiB
distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-z0wdirb5
distributed.worker - INFO -                Memory:                 327.53 MiB
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-a3fftqrq
distributed.worker - INFO - -------------------------------------------------
jacobtomlinson commented 2 years ago

Could you try using dask.config.get("distributed.comm.timeouts.connect") instead of accessing dask.config.config directly and see what that returns?

Your API example looks good, you should be able to set Dask config in there with dask.config.set(...) too.

lastephey commented 2 years ago

Ah yes, with your suggestions @jacobtomlinson the API does seems to work and the settings appear to be used, so thank you very much.

from dask_mpi import initialize
import dask
from dask.distributed import Client

initialize(local_directory='/tmp',worker_class='distributed.Worker')

dask.config.set({"distributed.comm.timeouts.connect": "1000s"})
dask.config.set({"distributed.comm.timeouts.tcp": "1000s"})

client = Client()

timeouts_connect = dask.config.get("distributed.comm.timeouts.connect")
print("timeouts_connect:", timeouts_connect)

timeouts_tcp = dask.config.get("distributed.comm.timeouts.tcp")
print("timeouts_tcp:", timeouts_tcp)

And in the cluster output I see

timeouts_connect: 1000s
timeouts_tcp: 1000s

so that's good!

Then I went to start up my 1300 worker cluster using the API, but unfortunately I still started to see

distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.128.9.101:33889', name: 864, status: undefined, memory: 0, processing: 0>
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.128.9.101:33889
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:    tcp://10.128.9.85:41291
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x2aaabda61cd0>>, <Task finished name='Task-15' coro=<Worker.heartbeat() done, defined at /global/common/software/das/stephey/dask-mpi-test/lib/python3.8/site-packages/distributed/worker.py:1182> exception=OSError('Timed out during handshake while connecting to tcp://10.128.9.85:41291 after 60 s')>)
Traceback (most recent call last):
  File "/global/common/software/das/stephey/dask-mpi-test/lib/python3.8/site-packages/distributed/comm/core.py", line 319, in connect
    handshake = await asyncio.wait_for(comm.read(), time_left())
  File "/global/common/software/das/stephey/dask-mpi-test/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

I am baffled about why it seems to still be using a 60s timeout limit.

jacobtomlinson commented 2 years ago

There is an ongoing discussion in https://github.com/dask/distributed/issues/3691 about scaling to large numbers of workers on HPC clusters. Folks there may have run into some of these same issues. Have you seen that issue?

lastephey commented 2 years ago

Thanks @jacobtomlinson. I had seen it awhile ago, but I went back and read it again.

I tried the Client.wait_for_workers suggestion there and I think that's helpful.

When I tried to launch our big 1300 worker node test with

salloc --nodes=20 --ntasks=1360 --cpus-per-task=1 --time=60 --constraint=knl --qos=interactive
srun -u dask-mpi --scheduler-file=scheduler.json --dashboard-address=0 --memory-limit="1.2 GiB" --nthreads=1 --local-directory=/tmp --worker-class distributed.Worker

and

client.wait_for_workers(n_workers=1300, timeout=360)

I took ORNL's advice and started up my client while the workers were still coming alive (although I'm not sure if the timing actually matters). I did still see a lot of the same timeout messages while the workers were starting but in the end all the workers arrived and I was able to complete a toy calculation with all 1300 workers.

Can you give any insight into the messages like distributed.comm.tcp - INFO - Connection from tcp://10.128.9.103:51178 closed before handshake completed? Does dask just try to connect again, even without nannies? Can these messages be safely ignored if all the workers do manage to start?

Thank you very much.

jacobtomlinson commented 2 years ago

There should be some retries built in there. Given the log is only at INFO level I would expect that things have not necessarily failed.

lastephey commented 2 years ago

I see, that is very helpful to know. When I saw those messages I assumed the workers were failing but it sounds like after a retry or two it was fine.

Since I have a workaround for now, do you think I should close this issue? Or do you want to look into adjusting the config settings further? Either way I am very grateful for all your help with this @kmpaul and @jacobtomlinson.

kmpaul commented 2 years ago

I think thanks go mostly to @jacobtomlinson. 😄 Excellent finds with the issue on dask/distributed!

kmpaul commented 2 years ago

(Oh, and I'm happy to close this issue for now, if you want. We can always reopen it.)