Closed Ovec8hkin closed 4 years ago
Thanks for the detailed bug report!
It looks like the Dask worker can not connect to the Dask scheduler.
Can you ask your cluster IT whether they allow TCP connection from your compute node (192.168.206.35) to the node where your Dask scheduler runs (192.168.192.54) and whether they have some restrictions on open ports?
Edit: @lesteve beat be to it by a minute... :)
Good to see you @willirath we semed to say a very similar thing so that's reassuring ;-) !
And by the way @Ovec8hkin, I think this is the first time I see someone with a ice hockey related handle on github ;-).
:)
On a German machine, @kathoef found the interface keyword to be insufficient in solving this kind of problem, although there was a possibility to select addresses that could talk to each other. Here's a solution that sets worker hostnames explicitly:
And by the way @Ovec8hkin, I think this is the first time I see someone with a ice hockey related handle on github ;-).
Thats funny. Its an old screen name Ive been using for probably like a decade now. Just happens to never be taken anywhere ;-). If you see it anywhere else, almost certainly me.
Thanks for the detailed bug report!
It looks like the Dask worker can not connect to the Dask scheduler.
Can you ask your cluster IT whether they allow TCP connection from your compute node (192.168.206.35) to the node where your Dask scheduler runs (192.168.192.54) and whether they have some restrictions on open ports?
Will do. Thats what's I expected to be the case, but Ive had the same code successfully connect before, so its kind of strange for it to just stop seemingly out of the blue.
Preemptively, do you see any reason why the following code would not connect to the scheduler:
def parallel_func(array):
"""
Code to run on each worker
"""
output = vectorized_sqrt(array)
total = sum(sum(output))
return total
def distributed_main():
"""
Create a large 2D numpy array, do some expensive computation on every element ***IN PARALLEL***,
return the sum.
"""
two_d_array = np.random.rand(10000, Y_DIM)
# Split the large array into smaller arrays along the Y axis
# Submit each smaller array as a job
futures = []
for i in range(NUM_JOBS):
start = (i * Y_DIM) // NUM_JOBS
end = ((i + 1) * Y_DIM) // NUM_JOBS
print([start, end])
# Sends lots of data over the network to each worker
future = client.submit(parallel_func, two_d_array[:, start:end])
futures.append(future)
print(client.scheduler_info())
progress(futures)
total = 0
for future in as_completed(futures):
total += future.result()
print(total)
return total
if __name__ == "__main__":
cluster = SLURMCluster(name='worker_bee',
queue='normal',
project='-----------',
cores=2,
memory='2GB',
interface='ib0')
cluster.scale(NUM_JOBS)
print("JOB FILE:", cluster.job_script())
client = Client(cluster)
print(client.scheduler_info())
print("Time to run parallel code:", timeit(stmt=distributed_main, number=1))
client.close()
Your script looks totally fine to me ...
The IT for my cluster responded saying that neither of those TCP/IP addresses are for nodes on the cluster. Though he also mentioned that should be no connections restrictions on any of the nodes.
The node I submit the jobs from (and presumably the one where the client and dask scheduler are running) is at 206.76.192.53.
Interesting note: I left the code running for a while (~15 min) and the workers eventually connected to the scheduler and ran the computations they were supposed to after ~12 minutes. Is this atypical behavior? Is there an expected amount of time it should take for the workers to connect to the scheduler?
Update: This is reproducible with varying amounts of time required for the workers to connect. Once the first set of workers connects, any additional workers submitted vis client.submit(...)
connect effectively instantly. Could this have something to do with the port that the workers are trying to connect over?
The IT for my cluster responded saying that neither of those TCP/IP addresses are for nodes on the cluster.
Not a networking expert but those IPs are probably internal (to your cluster) IP addresses and not external IP addresses (visible from the outside) maybe this is something that he misunderstood. You can probably use nslookup <ip-address>
from your cluster login node to turn that into a hostname that are genreally easier to grok for a human.
Interesting note: I left the code running for a while (~15 min) and the workers eventually connected to the scheduler and ran the computations they were supposed to after ~12 minutes.
OK at least that means that everything seems fine on TCP and port filtering. Not sure why your workers were not able to connect before ...
Is this atypical behavior? Is there an expected amount of time it should take for the workers to connect to the scheduler?
It completely depends what you mean by the 15 minutes wait + 12 minutes to run the computation. What is probably happening:
cluster.job_script()
is the submission script to launch each job)any additional workers submitted vis client.submit(...) connect effectively instantly.
There seems to be some slight misunderstanding here. client.submit
submit a "task" or in other words simply execute a Python function on a Dask worker it does not "submit a worker". The Dask workers are launched via .scale
in your script. This is generic to Dask and not specific to Dask-Jobqueue.
What is specific to Dask-Jobqueue is the way the Dask workers are created: they are submitted to your job scheduler (SLURM in your case). In practice depending on your cluster and how much resources you are asking for the job can sit in the queue for a while before they start running.
Maybe this slide can help: https://docs.google.com/presentation/d/e/2PACX-1vSTH2kAR0DCR0nw8pFBe5kuYbOk3inZ9cQfZbzOIRjyzQoVaOoMfI2JONGBz-qsvG_P6g050ddHxSXT/pub?start=false&loop=false&delayms=60000#slide=id.g646705d04f_0_371
and also this from the excellent Dask examples.
Ok. I think this was just a weird configuration thing with the HPC cluster Im on. Closing now.
Thanks for letting us know!
Hi,
I'm currently getting this error, on a previously working setup (I followed the pangeo setup for jupyter and dask-jobqueue for my local SGE HPC). I could successfully run with jupyter lab on both a login node and in an interactive session on a compute node (preferable).
Now when I scale to say 150 workers and compute a task, the interactive session on the compute node gets killed and the workers die.
The output from one of the workers is:
distributed.nanny - INFO - Start Nanny at: 'tcp://10.12.169.32:39873'
distributed.worker - INFO - Start worker at: tcp://10.12.169.32:44544
distributed.worker - INFO - Listening to: tcp://10.12.169.32:44544
distributed.worker - INFO - dashboard at: 10.12.169.32:37263
distributed.worker - INFO - Waiting to connect to: tcp://10.12.169.32:44681
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 1000.00 MB
distributed.worker - INFO - Local Directory: /nobackup/earlacoa/dask/scripts/dask-worker-space/worker-9m7ponfk
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to: tcp://10.12.169.32:44681
distributed.worker - INFO - Waiting to connect to: tcp://10.12.169.32:44681
distributed.worker - INFO - Waiting to connect to: tcp://10.12.169.32:44681
distributed.worker - INFO - Waiting to connect to: tcp://10.12.169.32:44681
distributed.worker - INFO - Waiting to connect to: tcp://10.12.169.32:44681
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.169.32:39873'
distributed.worker - INFO - Stopping worker at tcp://10.12.169.32:44544
distributed.worker - INFO - Closed worker has not yet started: None
distributed.dask_worker - INFO - End worker
Traceback (most recent call last):
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 226, in _
await asyncio.wait_for(self.start(), timeout=timeout)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/asyncio/tasks.py", line 362, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 452, in <m
odule>
go()
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 448, in go
main()
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 434, in ma
in
loop.run_sync(run)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/ioloop.py", line 532, in run_sync
return future_cell[0].result()
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/cli/dask_worker.py", line 428, in ru
n
await asyncio.gather(*nannies)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 232, in _
type(self).__name__, timeout
concurrent.futures._base.TimeoutError: Nanny failed to start in 60 seconds
The interactive session on the compute node dies with the following output:
(pangeo) [earlacoa@d8s0b3.arc4 dask]$ jupyter lab --no-browser --ip=`hostname` --port=8888
[I 14:23:01.735 LabApp] JupyterLab extension loaded from /nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/jupyterlab
[I 14:23:01.735 LabApp] JupyterLab application directory is /nobackup/earlacoa/miniconda3/envs/pangeo/share/jupyter/lab
[I 14:23:01.770 LabApp] Serving notebooks from local directory: /nobackup/earlacoa/dask
[I 14:23:01.771 LabApp] The Jupyter Notebook is running at:
[I 14:23:01.771 LabApp] http://d8s0b3.arc4.leeds.ac.uk:8888/
[I 14:23:01.771 LabApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[I 14:23:56.835 LabApp] Build is up to date
TermSocket.open: 1
TermSocket.open: Opened 1
/Websocket closed
\[I 14:24:05.747 LabApp] Kernel started: 6e8bb670-4631-4a5e-bdb3-bd79c29f1d69
/TermSocket.open: 2
TermSocket.open: Opened 2
[I 14:26:02.090 LabApp] Saving file at /scripts/emulator.ipynb
[I 14:28:03.165 LabApp] Saving file at /scripts/emulator.ipynb
[I 14:30:03.521 LabApp] Saving file at /scripts/emulator.ipynb
[I 14:30:05.138 LabApp] Client sent subprotocols: ['bokeh', 'eyJzZXNzaW9uX2lkIjogIjdGVnBMdDdnaGZaQ2JVckowN25CMkc1a3BuWWdqaENzQVBOZXR4OWVVRWVmIiwgInNlc3Npb25fZXhwaXJ5IjogMTU5MDU4OTg4MiwgImhlYWRlcnMiOiB7Ikhvc3QiOiAibG9jYWxob3N0Ojg4ODgiLCAiQ29ubmVjdGlvbiI6ICJrZWVwLWFsaXZlIiwgIkNhY2hlLUNvbnRyb2wiOiAibWF4LWFnZT0wIiwgIlVwZ3JhZGUtSW5zZWN1cmUtUmVxdWVzdHMiOiAiMSIsICJVc2VyLUFnZW50IjogIk1vemlsbGEvNS4wIChXaW5kb3dzIE5UIDEwLjA7IFdpbjY0OyB4NjQpIEFwcGxlV2ViS2l0LzUzNy4zNiAoS0hUTUwsIGxpa2UgR2Vja28pIENocm9tZS84MS4wLjQwNDQuMTM4IFNhZmFyaS81MzcuMzYiLCAiQWNjZXB0IjogInRleHQvaHRtbCxhcHBsaWNhdGlvbi94aHRtbCt4bWwsYXBwbGljYXRpb24veG1sO3E9MC45LGltYWdlL3dlYnAsaW1hZ2UvYXBuZywqLyo7cT0wLjgsYXBwbGljYXRpb24vc2lnbmVkLWV4Y2hhbmdlO3Y9YjM7cT0wLjkiLCAiU2VjLUZldGNoLVNpdGUiOiAiY3Jvc3Mtc2l0ZSIsICJTZWMtRmV0Y2gtTW9kZSI6ICJuYXZpZ2F0ZSIsICJTZWMtRmV0Y2gtVXNlciI6ICI_MSIsICJTZWMtRmV0Y2gtRGVzdCI6ICJkb2N1bWVudCIsICJBY2NlcHQtRW5jb2RpbmciOiAiZ3ppcCIsICJBY2NlcHQtTGFuZ3VhZ2UiOiAiZW4tR0IsZW4tVVM7cT0wLjksZW47cT0wLjgiLCAiQ29va2llIjogIl94c3JmPTJ8OGI5MjJlMmN8YTA3OTZkZWJmYzk2ODBhYzVjN2IyYmE1ZGI5NGQxZGV8MTU5MDQwMTU4MzsgdXNlcm5hbWUtbG9jYWxob3N0LTg4ODg9XCIyfDE6MHwxMDoxNTkwNTc1MjIxfDIzOnVzZXJuYW1lLWxvY2FsaG9zdC04ODg4fDQ0Ok1EWTRaVEkwT0dWak1qSXhORGRsWWprME1tUTNPR0poTm1FMk5XTmtOR1E9fDY4MWNhZWExYWI0ZjNhMTBmMTYzNDI1MDMxOWY2Y2ZiOTMwMGRkYWU0MTcxYTcyNGE0MzI1NWNhYjhlZGE3OWFcIiIsICJYLUZvcndhcmRlZC1Db250ZXh0IjogIi9wcm94eS84Nzg3IiwgIlgtUHJveHljb250ZXh0cGF0aCI6ICIvcHJveHkvODc4NyJ9LCAiY29va2llcyI6IHsiX3hzcmYiOiAiMnw4YjkyMmUyY3xhMDc5NmRlYmZjOTY4MGFjNWM3YjJiYTVkYjk0ZDFkZXwxNTkwNDAxNTgzIiwgInVzZXJuYW1lLWxvY2FsaG9zdC04ODg4IjogIjJ8MTowfDEwOjE1OTA1NzUyMjF8MjM6dXNlcm5hbWUtbG9jYWxob3N0LTg4ODh8NDQ6TURZNFpUSTBPR1ZqTWpJeE5EZGxZamswTW1RM09HSmhObUUyTldOa05HUT18NjgxY2FlYTFhYjRmM2ExMGYxNjM0MjUwMzE5ZjZjZmI5MzAwZGRhZTQxNzFhNzI0YTQzMjU1Y2FiOGVkYTc5YSJ9fQ']
[I 14:30:05.139 LabApp] Trying to establish websocket connection to ws://localhost:8787/status/ws
[I 14:30:05.143 LabApp] Websocket connection established to ws://localhost:8787/status/ws
LabApp - ERROR - Uncaught exception
Traceback (most recent call last):
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/websocket.py", line 649, in _run_callback
result = callback(*args, **kwargs)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/websocket.py", line 1528, in on_message
return self._on_message(message)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/websocket.py", line 1534, in _on_message
self._on_message_callback(message)
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/jupyter_server_proxy/handlers.py", line 296, in message_cb
self.write_message(message, binary=isinstance(message, bytes))
File "/nobackup/earlacoa/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/websocket.py", line 339, in write_message
raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
The versions of some potentially relevant python libraries are:
dask 2.17.0 py_0 conda-forge
dask-core 2.17.0 py_0 conda-forge
dask-jobqueue 0.7.1 py_0 conda-forge
dask-labextension 2.0.2 pypi_0 pypi
jupyter 1.0.0 py_2 conda-forge
jupyter-server-proxy 1.5.0 pypi_0 pypi
jupyter_client 6.1.3 py_0 conda-forge
jupyter_console 6.1.0 py_1 conda-forge
jupyter_core 4.6.3 py36h9f0ad1d_1 conda-forge
jupyterlab 2.1.3 py_0 conda-forge
jupyterlab_server 1.1.5 py_0 conda-forge
tornado 6.0.4 py36h8c4c3a4_1 conda-forge
The ~/.config/dask/jobqueue.yaml
files is as follows. The error occurs both with and without the use of the ib0
interface.
jobqueue:
sge:
name: dask-worker
# Dask worker options
cores: 1 # Total number of cores per job
memory: '1 GB' # Total amount of memory per job
processes: 1 # Number of Python processes per job
interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
# SGE resource manager options
shebang: "#!/usr/bin/env bash"
queue: null
project: null
walltime: '01:00:00'
extra: []
env-extra: []
job-extra: []
log-directory: null
resource-spec: null
distributed:
worker:
memory:
target: false # dont spill to disk
spill: false # dont spill to disk
pause: 0.80 # pause memory execution at 80% use
terminate: 0.95 # restart the worker at 95% use
From the discussion above, the issue was considered a local HPC issue and it's not clear what the solution was.
I'd be very grateful if anyone have any ideas of a potential solution to this issue?
Thanks, Luke
Can you edit your post to add the snippet you are using in your Jupyter notebook?
Also this may be useful to troubleshoot: https://jobqueue.dask.org/en/latest/debug.html.
Some suggestions based on the info you provided:
Thanks for your reply.
The relevant code in Jupyter Lab is below. I omitted the code for other python library imports, reading in data, misc. setup, and function creation. I'm basically applying a function to a dask bag.
from dask_jobqueue import SGECluster
from dask.distributed import Client
cluster = SGECluster(walltime='03:00:00', interface='ib0')
client = Client(cluster)
cluster.scale(jobs=150)
bag = db.from_sequence(gridcells)
bag = bag.map(emulator)
results = bag.compute() # breaks here
- "previously working setup" has something changed ?
I previously ran jupyter lab on a login node and the code worked for the 150 workers and ~15,000 gridcells. I gathered running on the login node was bad practise, so I switched to running on an interactive session (using qsh/qlogin). This was the only change.
Update I have reverted back to running on the login node and confirm the code works, so the issue is specific to the interactive session on the compute node.
I gathered running on the login node was bad practise
I agree and at the same time good and bad practice is not very well defined. Running the Dask scheduler should not be ressource intensive, but if you do a lot of post-treatment in your notebook, that may be a problem.
About your problem with the interactive node, hard to know what is happening, maybe they have restriction on the interactive node that is a problem for websockets (not an expert on this networking stuff). My guess is that your Jupyter kernel gets killed first because of the websocket problem and that means your Dask scheduler dies (it lives in your Jupyter kernel) and the workers can not connect. Reading the error again it seems like it is related to Bokeh/the diagnostics dashboard (port is 8787 by default). If you don't need the diagnostic dashboard, you can use extra=['--no-dashboard']
in SGECluster
to disable it and see whether the error goes away.
Wow you are really running on 150 jobs, that's impressive. How many total cores does that correspond too out of curiosity?
Thanks again for your help here.
Yes, the notebook uses 10-15 Gb and 10-15% CPU of the 192 Gb / 40 core login node (of which there is 2).
Testing on the interactive node using the extra=['--no-dashboard'] scheduler argument, the code does work successfully as you suspected. Although the dashboard is not essential for my work, it is really useful to have.
Yes, though each job only requests 1 GB and 1 core, so 150 Gb and 150 cores in total. This way I get through the queue faster, than if I requested whole node(s). Though I don't know if this approach is relatively worse for other issues (e.g. worker communication?).
I am not sure how to debug this further I have to admit, and it is not clear whether it is a JupyterLab (maybe a Dask JupyterLab extension if you are using it) or Dask problem ...
Here is what I would do if I were in your position depending on your priorities
extra=['--no-dashboard']
seems the easiest option
run on the login node and try to move some of your code that uses resources to Dask. The idea is to move the resource-intensive work you do in your notebook to Dask workers.
Maybe you can use Dask collections further to do some of your post-treatment, maybe .persist
could be useful to keep your memory-intensive data structures in your workers memory rather than bringing it in the notebook.
.scale
, do you get the same if you have less workers, do you need to do some computation for the problem to happen, e.g. does it happen if you do client.wait_for_workers(150)
Thanks again for your helpful suggestions.
I've asked cluster IT and I'm waiting for a response (this may take a while).
In the meantime, as you can see from the link above, I've posted this issue on Dask distributed in case anyone has any ideas.
Can you access the dashboard before the .scale
Yes, I can access the dashboard before and after .scale()
.
do you get the same if you have less workers
Not for a much smaller number e.g. 5, but yes for numbers of workers say above 50.
do you need to do some computation for the problem to happen
Yes, the problem occurs when I call .compute()
.
if you are using the Dask JupyterLab extension
Yes, I'm using the Dask JupyterLab extension.
Thanks for your feed-back I suggest we moved to https://github.com/dask/distributed/issues/3857 to discuss this issue as there maybe more people with useful feed-back there.
So for completeness: after further investigation @lukeconibear's issue was that the interactive job was killed because it was using too many resources. In his SGE setup qacct -j <his_job_id>
was useful to troubleshoot. I am guessing that a similar thing exists for non-SGE clusters.
Hello,I occurred to this situation just before.The reason is actually the nodes can't connect each others because the nodes don't connect each others by default interface,even the other interfaces.The nodes connect each others by isolate dns gateway.So I tried to add a argument like following:
import socket
scheduler_options={"host":socket.gethostname()}
That works well, and the nanny process can start normally.
I am attempting to run some very simple dask code that uses
SLURMCluster
to submit jobs to a SLURM job scheduler. However, every job I submit throws the following error:asyncio.exceptions.TimeoutError: Nanny failed to start in 60 seconds
.My cluster setup is as follows:
I confirmed the cluster I am on has ib0 and the nanny is attempting to connect to the ib0 IP address.
I have also confirmed that the job headers written by the cluster work, as a simple python script with the same job headers runs fine when submitted as a job to
sbatch
.The output file from the job is at follows:
I can find no additional documentation of the potential causes of this error, so any help would be appreciated.
Relevant Packages: