dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed #3129

Open lastmeta opened 5 years ago

lastmeta commented 5 years ago

I have 2 machines: a worker machine and a scheduler machine.

The worker machine is centos 7 with python3.7 and dask-distributed 2.5.2.

The scheduler machine has a docker container running. The docker container has the same version of python and dask, and incidentally, it is also a centos 7 image.

I start the scheduler docker container with this docker-compose yaml:

    version: '3.7'
    services:
      service1:
        image: ...
        container_name: ...
        network_mode: bridge
        env_file:
          - ~/.env
        ports:
          - "8888:8888"
          - "9796:9796"
          - "9797:9797"
        volumes:
          ...
        command: jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root

(Notice I'm mapping the two ports needed for a scheduler to run.)

When I start up the a dask-worker on the worker box and a dask-scheduler in the docker container. everything seems to initiate correctly except after a little bit I get this error:

[root@510b0c5af190 web]# my_project.py run distributed_workflow
Traceback (most recent call last):
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 237, in write
    stream.write(b)
  File "/conda/lib/python3.7/site-packages/tornado/iostream.py", line 546, in write
    self._check_closed()
  File "/conda/lib/python3.7/site-packages/tornado/iostream.py", line 1009, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/conda/bin/sql_server", line 11, in <module>
    load_entry_point('sql-server', 'console_scripts', 'sql_server')()
  File "/conda/lib/python3.7/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/conda/lib/python3.7/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/conda/lib/python3.7/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/app/sql_server/cli/sql_server.py", line 28, in run
    daily(True)
  File "/app/sql_server/cli/run/daily.py", line 166, in daily
    verbose=True,
  File "/wcf/spartans/spartans/spartans.py", line 116, in __enter__
    self.open()
  File "/wcf/spartans/spartans/spartans.py", line 123, in open
    self._start_agents()
  File "/wcf/spartans/spartans/spartans.py", line 179, in _start_agents
    set_as_default=False)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 715, in __init__
    self.start(timeout=timeout)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 880, in start
    sync(self.loop, self._start, **kwargs)
  File "/conda/lib/python3.7/site-packages/distributed/utils.py", line 333, in sync
    raise exc.with_traceback(tb)
  File "/conda/lib/python3.7/site-packages/distributed/utils.py", line 317, in f
    result[0] = yield future
  File "/conda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 973, in _start
    await self._ensure_connected(timeout=timeout)
  File "/conda/lib/python3.7/site-packages/distributed/client.py", line 1040, in _ensure_connected
    {"op": "register-client", "client": self.id, "reply": False}
  File "/conda/lib/python3.7/site-packages/tornado/gen.py", line 748, in run
    yielded = self.gen.send(value)
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 254, in write
    convert_stream_closed_error(self, e)
  File "/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 132, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

So I investigate the logs. The worker log looks like this:

(base) [worker@worker-03 tmp]$ cat worker_output.txt
distributed.nanny - INFO -         Start Nanny at: 'tcp://10.1.25.3:43111'
distributed.diskutils - INFO - Found stale lock file and directory '/home/worker/worker-h8jhtnng', purging
distributed.worker - INFO -       Start worker at:      tcp://10.1.25.3:42739
distributed.worker - INFO -          Listening to:      tcp://10.1.25.3:42739
distributed.worker - INFO -          dashboard at:            10.1.25.3:40970
distributed.worker - INFO - Waiting to connect to:   tcp://scheduler.myco.com:9796
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                   33.53 GB
distributed.worker - INFO -       Local Directory: /home/worker/worker-mf3marrd
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://scheduler.myco.com:9796
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

and my scheduler log (inside my docker container, on the scheduler.myco.com machine) looks like this:

[root@510b0c5af190 web]# cat /tmp/worker_output.txt 
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-lq7oa5sc
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://172.17.0.2:9796
distributed.scheduler - INFO -   dashboard at:                     :9797
distributed.scheduler - INFO - Register tcp://10.1.25.3:42739
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.1.25.3:42739
distributed.core - INFO - Starting established connection

Now, there are no errors in the logs. Indeed, I inspect the running processes and I see this:

Worker machine

worker  8974  1.6  0.1 394088 45044 ?        Sl   16:04   0:14 /data/anaconda3/bin/python /data/anaconda3/bin/dask-worker scheduler.myco.com:9796 --dashboard-prefix my_workflow

Scheduler container:

root        33  1.4  1.4 670884 115056 pts/0   Sl   16:04   0:15 /conda/bin/python /conda/bin/dask-scheduler --port 9796 --dashboard-address 172.17.0.2:9797 --dashboard-prefix my_workflow

Notice the 172.17.0.2 address is the address inside the scheduler container. If I try to initiate the dask-address as the hostname of the host machine instead I get this error [Errno 99] Cannot assign requested address presumably because the port 9797 is already taken by the docker container.

Anyway. These processes are still running, yet to my knowledge, they're not working on the workflow I tried to pass to the worker. Can you please help me understand what I'm doing wrong to produce the error above?

TomAugspurger commented 5 years ago

What's in distributed_workflow? When does the error occur, when something is happening, or when the cluster is idle, or at the end of the script?

lastmeta commented 5 years ago

it seems to error on or during the

def distributed_workflow():
    ...
    # workflow is a dictionary with this structure {'task_name': (function(), arguments)}
    finished = client.get(workflow, 'workflow_end_task')

It seems to send a partial workflow or something then errors. I know this because as it begins running I look at the dask dashboard -> graph and see only part of the workflow then it errors.

Its as if the communication fails during the distribution of the workflow to the workers. but as you can see above, the processes seem like they are connected. Anyway, it happens during that client.get call.

Is there something I don't understand about working with Docker? I've been able to get this process to work fine on the bare machines. But when I introduce docker this error happens.

TomAugspurger commented 5 years ago

I'm not too familiar with Docker's networking, so I don't know.

If I were you, I would try gradually simplifying your script to make it easier to see where things break.

alobbs commented 4 years ago

I'm hitting the same problem running Dask outside a container. I do not think it's related to Docker or the container configuration.

TomAugspurger commented 4 years ago

@alobbs thanks for the info, that's helpful. Are you able to share a reproducible example?

alobbs commented 4 years ago

@TomAugspurger, I'm afraid I do not recall what I was doing, although I can tell you it's been a one time issue. My gut feeling is that it was due to the fact I had clients running that had been waiting for workers to get available and I was meddling with them (shutting dask-worker down and relaunching in different servers).

djarecka commented 4 years ago

I'm having a similar issues outside containers:

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x1096bbd10>>, <Task finished coro=<Worker.heartbeat() done, defined at /Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py:881> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py", line 918, in heartbeat
    raise e
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/worker.py", line 891, in heartbeat
    metrics=await self.get_metrics(),
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
    convert_stream_closed_error(self, e)
  File "/Users/dorota/miniconda3/envs/pydra_dask_py37/lib/python3.7/site-packages/distributed/comm/tcp.py", line 123, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

i'm running a test set with pytest and it happens "randomly". Was not able to reproduce running a single test. And my tests are actually reported as Passed...

OSX, py3.7, dask 2.10.1, distributed 2.10.0

medley56 commented 4 years ago

Another vote for addressing this issue. As others have said, it's inconsistent and difficult to reproduce, but it's definitely there. We are currently messing with setting the number of workers and memory allocation for each worker as that has fixed similar issues in the past.

dask==2.9.2
distributed==2.6.0

I've done a bit of experimentation and it looks like it is fixed by setting memory limit per worker and the number of workers to something other than their default values. I have absolutely no idea why this fixes it.

jrbourbeau commented 4 years ago

I'd encourage those running into this issue to

1) Try upgrading to the latest release of distributed to see if the problem still persists. There were recent updates to how CommClosedErrors are handled when raised while workers are heartbeating. It's not clear that this will totally resolve the issue, but the traceback in https://github.com/dask/distributed/issues/3129#issuecomment-589235528 indicates that a CommClosedError is raised during a worker heartbeat.

2) If possible, provide a minimal reproducible example. This drastically increases the chance a maintainer will be able to debug the issue.

MathewHouse commented 4 years ago

Hi,

I ran into the same issue with a graph I created using Dask Futures. It turns out that it doesn't affect the result in my case and I highly suspect that in my case it's when a task is writing a file (.csv in my case) and it takes longer than a predefined timeout (I checked in the dask params and you indeed have comm.timeouts.connect set to 10s but not convince it's this one as I tried to set it to 30s and it continued crashing). Anyway my guess is that during a long task the worker can't communicate then the scheduler see it as dead breaks the connection. I found a workaround by changing the number of retries in comm.retry.count - called in distributed.utils_comm - to 2. No more connection issues. Again I am sure it depends on your code / how long it runs / a task runs

Hope it helps.

python 3.6.9 dask 2.9.1 distributed 2.9.1 tornado 6.0.3

PS: maybe that has been fixed by a more recent version of distributed, but my company still runs w 2.9.1

mrocklin commented 4 years ago

Tasks can take a long time (several hours is common) that is unrelated to the timeout (unless your task holds the GIL, which is uncommon unless you're wrapping custom C code). Regardless, upgrading would be good.

navjotk commented 4 years ago

I too am facing this issue and it won't go away with dask/distributed 2.20

TomAugspurger commented 4 years ago

This error can happen for many reasons. Are you able to provide a minimal example?

On Tue, Jul 7, 2020 at 7:44 AM Navjot Kukreja notifications@github.com wrote:

I too am facing this issue and it won't go away with dask/distributed 2.20

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3129#issuecomment-654830905, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOIUWEP44CLNJNLVPP43R2MKCZANCNFSM4I6X24QQ .

twoertwein commented 3 years ago

I created a reproducible example. It isn't minimal but fairly compact. Based on the stacktrace the error seems to have a different code path than what previous people have reported.

The first run is usually successful but executing it a second time immediately after it leads to the error. I have no dask config files.

I cannot reproduce it with pure=False and I can also not reproduce it when setting n_workers=2 with no adapt().

import random
import string
from pathlib import Path

import dask
from dask.distributed import Client, LocalCluster

def dummy(*args, **kwargs):
    import time

    time.sleep(10)
    return True

def test():
    delayed_function = dask.delayed(dummy, pure=True)
    targets = [
        delayed_function(delayed_function),
        delayed_function(delayed_function),
        delayed_function(delayed_function, delayed_function),
    ]

    random_suffix = "".join(random.choices(string.ascii_letters + string.digits, k=10))
    with LocalCluster(
        local_directory=Path(".") / f"dask_{random_suffix}",
        threads_per_worker=1,
        processes=True,
        n_workers=1,
    ) as cluster:
        cluster.adapt(minimum=1, maximum=2)
        with Client(cluster) as client:
            print(dask.compute(*targets, scheduler=client))

if __name__ == "__main__":
    test()

Typical output

$ python test_tcp.py
(True, True, True)
Task was destroyed but it is pending!
task: <Task pending name='Task-152' coro=<AdaptiveCore.adapt() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:179> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7c6f4a75b0>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py:688]>

$ python test_tcp.py
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f475e2d68e0>>, <Task finished name='Task-65' coro=<Worker.close() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/worker.py:1169> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/worker.py", line 1193, in close
    await r.close_gracefully()
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/core.py", line 858, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/core.py", line 641, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read
    convert_stream_closed_error(self, e)
  File "/pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
(True, True, True)
Task was destroyed but it is pending!
task: <Task pending name='Task-152' coro=<AdaptiveCore.adapt() done, defined at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:179> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f0c8548f070>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /pool01/home/twoertwe/.cache/pypoetry/virtualenvs/panam-zGchuJUy-py3.8/lib/python3.8/site-packages/tornado/ioloop.py:688]>

edit: python 3.8.6 dask 2020.12.0 distributed 2021.01.1 tornado 6.1

twoertwein commented 3 years ago

I assume that my example is also reproducible without adapt(). I'm not using adapt() in my real code anymore and I'm occasionally still getting the above error. If I get the error, I always seem to get it at the beginning. Maybe it is related to starting new workers?

mb-qco commented 3 years ago

I have a similar issue it appears when running workers as containers in an EKS cluster and using the distributed client on a KubeCluster instance.

This is my error when executing a computation on the workers (cluster.scale() and so on works fine):

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f28c6a4e190>>, <Task finished name='Task-13182' coro=<Cluster._sync_cluster_info() done, defined at /opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/deploy/cluster.py:98> exception=CommClosedError("Exception while trying to call remote method 'set_metadata' before comm was established.")>)
Traceback (most recent call last):
  File "/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 198, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/core.py", line 798, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/core.py", line 651, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 214, in read
    convert_stream_closed_error(self, e)
  File "/opt/miniconda3/envs/mattia_env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 128, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.set_metadata local=tcp://172.60.3.87:39294 remote=tcp://internal-*.region.elb.amazonaws.com:8786>: Stream is closed

Note that it is trying to do rpc.set_metadata local=tcp://<uri-of-load-balancer>. This doesn't make much sense to me since I am using a remote cluster.

JaccoVE commented 1 year ago

Any news on this topic? I'm still having this problem with Dask 2023.3.2 and Distributed 2023.3.2.

fjetter commented 1 year ago

@JaccoVE can you please share more information about your problem? Ideally, please open a new issue since there may be many different reasons why such a connection failure occurs and it is helpful for us to keep things separated until we have diagnosed the issue

JaccoVE commented 1 year ago

For your information, the specific script I'm trying to run is the following:

https://github.com/ELELAB/RosettaDDGPrediction/blob/master/RosettaDDGPrediction/rosetta_ddg_run.py

I opened a new issue on this Github repository as well.

surak commented 1 year ago
import os
import sys
from dask_mpi import initialize
from dask.distributed import Client

import joblib

if len(sys.argv) != 2:
    print("Usage ", sys.argv[0]," <N>")
    sys.exit()
else:
    N = int(sys.argv[1])

def f(x):
    return x*x

if __name__ == '__main__':

    # Query SLURM environment per worker task
    p = int(os.getenv('SLURM_CPUS_PER_TASK'))
    mem = "1024" # os.getenv('SLURM_MEM_PER_CPU')
    mem = str(int(mem)*p)+'MB'

    # Initialise Dask cluster and client interface
    initialize(interface = 'enp130s0f0', nthreads=p, local_directory='/tmp', memory_limit=mem)
    client = Client()

    # We expect SLURM_NTASKS-2 workers
    N = int(os.getenv('SLURM_NTASKS'))-2
    print("********************* I HAVE N as ", N)

    # Wait for these workers and report
    client.wait_for_workers(n_workers=N)

    num_workers = len(client.scheduler_info()['workers'])
    print("%d workers available and ready"%num_workers)

    # Create a list of inputs to the function f
    inputs = range(N)

    # Associate a list of outputs with delayed calls to f
    # with p processes available to evaluate them.
    with joblib.parallel_backend('dask'):
        output_list = joblib.Parallel(n_jobs=N)(joblib.delayed(f)(i) for i in inputs)

    # Printing the outputs will cause then to be evaluated
    for output in output_list:
        print(output)

    import time
    time.sleep(3)
    client.shutdown()

and

#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=1
#SBATCH --time=08:00:00
#SBATCH --partition=gpus

module purge
module load GCC OpenMPI
module load SciPy-bundle
module load dask

srun python dask_joblib.py 2
2023-07-26 17:16:29,931 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2023-07-26 17:16:29,971 - distributed.scheduler - INFO - State start
2023-07-26 17:16:29,974 - distributed.scheduler - INFO -   Scheduler at:   tcp://134.94.1.44:33637
2023-07-26 17:16:29,974 - distributed.scheduler - INFO -   dashboard at:          134.94.1.44:8787
2023-07-26 17:16:29,998 - distributed.worker - INFO -       Start worker at:    tcp://134.94.1.45:37897
2023-07-26 17:16:29,998 - distributed.worker - INFO -          Listening to:    tcp://134.94.1.45:37897
2023-07-26 17:16:29,998 - distributed.worker - INFO -       Start worker at:    tcp://134.94.1.45:42577
2023-07-26 17:16:29,998 - distributed.worker - INFO -          Listening to:    tcp://134.94.1.45:42577
2023-07-26 17:16:29,998 - distributed.worker - INFO -           Worker name:                          2
2023-07-26 17:16:29,998 - distributed.worker - INFO -           Worker name:                          3
2023-07-26 17:16:29,998 - distributed.worker - INFO -          dashboard at:          134.94.1.45:44905
2023-07-26 17:16:29,998 - distributed.worker - INFO -          dashboard at:          134.94.1.45:37293
2023-07-26 17:16:29,998 - distributed.worker - INFO - Waiting to connect to:    tcp://134.94.1.44:33637
2023-07-26 17:16:29,998 - distributed.worker - INFO - Waiting to connect to:    tcp://134.94.1.44:33637
2023-07-26 17:16:29,998 - distributed.worker - INFO - -------------------------------------------------
2023-07-26 17:16:29,998 - distributed.worker - INFO - -------------------------------------------------
2023-07-26 17:16:29,998 - distributed.worker - INFO -               Threads:                          1
2023-07-26 17:16:29,998 - distributed.worker - INFO -               Threads:                          1
2023-07-26 17:16:29,998 - distributed.worker - INFO -                Memory:                   0.95 GiB
2023-07-26 17:16:29,998 - distributed.worker - INFO -                Memory:                   0.95 GiB
2023-07-26 17:16:29,998 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-bvjd5gv_
2023-07-26 17:16:29,998 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-gfcapga9
2023-07-26 17:16:29,998 - distributed.worker - INFO - -------------------------------------------------
2023-07-26 17:16:29,998 - distributed.worker - INFO - -------------------------------------------------
2023-07-26 17:16:30,130 - distributed.scheduler - INFO - Receive client connection: Client-293db022-2bd8-11ee-90dc-a0369f2148d4
2023-07-26 17:16:30,133 - distributed.core - INFO - Starting established connection
2023-07-26 17:16:30,199 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://134.94.1.45:37897', name: 2, status: init, memory: 0, processing: 0>
2023-07-26 17:16:30,200 - distributed.scheduler - INFO - Starting worker compute stream, tcp://134.94.1.45:37897
2023-07-26 17:16:30,200 - distributed.core - INFO - Starting established connection
2023-07-26 17:16:30,201 - distributed.worker - INFO -         Registered to:    tcp://134.94.1.44:33637
2023-07-26 17:16:30,201 - distributed.worker - INFO - -------------------------------------------------
2023-07-26 17:16:30,200 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://134.94.1.45:42577', name: 3, status: init, memory: 0, processing: 0>
2023-07-26 17:16:30,202 - distributed.core - INFO - Starting established connection
2023-07-26 17:16:30,201 - distributed.scheduler - INFO - Starting worker compute stream, tcp://134.94.1.45:42577
2023-07-26 17:16:30,201 - distributed.core - INFO - Starting established connection
2023-07-26 17:16:30,202 - distributed.worker - INFO -         Registered to:    tcp://134.94.1.44:33637
2023-07-26 17:16:30,202 - distributed.worker - INFO - -------------------------------------------------
2023-07-26 17:16:30,203 - distributed.core - INFO - Starting established connection
2023-07-26 17:16:30,304 - distributed.scheduler - INFO - Receive client connection: Client-worker-296bf4c3-2bd8-11ee-a619-a0369f214ab8
2023-07-26 17:16:30,304 - distributed.core - INFO - Starting established connection
2023-07-26 17:16:30,305 - distributed.scheduler - INFO - Receive client connection: Client-worker-296bf4c2-2bd8-11ee-a61a-a0369f214ab8
2023-07-26 17:16:30,305 - distributed.core - INFO - Starting established connection
2023-07-26 17:16:33,326 - distributed.scheduler - INFO - Scheduler closing...
2023-07-26 17:16:33,327 - distributed.scheduler - INFO - Scheduler closing all comms
2023-07-26 17:16:33,327 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://134.94.1.45:37897', name: 2, status: running, memory: 0, processing: 0>
2023-07-26 17:16:33,327 - distributed.core - INFO - Removing comms to tcp://134.94.1.45:37897
2023-07-26 17:16:33,328 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://134.94.1.45:42577', name: 3, status: running, memory: 0, processing: 0>
2023-07-26 17:16:33,328 - distributed.worker - INFO - Stopping worker at tcp://134.94.1.45:37897
2023-07-26 17:16:33,328 - distributed.core - INFO - Removing comms to tcp://134.94.1.45:42577
2023-07-26 17:16:33,328 - distributed.worker - INFO - Stopping worker at tcp://134.94.1.45:42577
2023-07-26 17:16:33,328 - distributed.scheduler - INFO - Lost all workers
2023-07-26 17:16:33,329 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-be4f2df9-ba89-4bfc-89b0-e0ce96691790 Address tcp://134.94.1.45:37897 Status: Status.closing
2023-07-26 17:16:33,329 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-209b4e96-5b34-4034-99c7-b155bef7fd42 Address tcp://134.94.1.45:42577 Status: Status.closing
********************* I HAVE N as  2
2 workers available and ready
0
1
2023-07-26 17:16:33,329 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://134.94.1.45:40520 remote=tcp://134.94.1.44:33637>
Traceback (most recent call last):
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/easybuild/2020/software/bokeh/2.4.3-foss-2022a/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
2023-07-26 17:16:33,329 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://134.94.1.45:40522 remote=tcp://134.94.1.44:33637>
Traceback (most recent call last):
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/easybuild/2020/software/bokeh/2.4.3-foss-2022a/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
2023-07-26 17:16:33,331 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Client->Scheduler local=tcp://134.94.1.45:40526 remote=tcp://134.94.1.44:33637>
Traceback (most recent call last):
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/easybuild/2020/software/bokeh/2.4.3-foss-2022a/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
2023-07-26 17:16:33,331 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Client->Scheduler local=tcp://134.94.1.45:40524 remote=tcp://134.94.1.44:33637>
Traceback (most recent call last):
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/comm/tcp.py", line 317, in write
    raise StreamClosedError()
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/easybuild/2020/software/bokeh/2.4.3-foss-2022a/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/comm/tcp.py", line 328, in write
    convert_stream_closed_error(self, e)
  File "/easybuild/2020/software/dask/2022.10.0-foss-2022a/lib/python3.10/site-packages/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://134.94.1.45:40524 remote=tcp://134.94.1.44:33637>: Stream is closed
surak commented 1 year ago

This also does not work:

https://support.nesi.org.nz/hc/en-gb/articles/360001392636-Configuring-Dask-MPI-jobs

surak commented 1 year ago

I'm having difficulty having a working example of dask.

46319943 commented 1 year ago

Hi,

I ran into the same issue with a graph I created using Dask Futures. It turns out that it doesn't affect the result in my case and I highly suspect that in my case it's when a task is writing a file (.csv in my case) and it takes longer than a predefined timeout (I checked in the dask params and you indeed have comm.timeouts.connect set to 10s but not convince it's this one as I tried to set it to 30s and it continued crashing). Anyway my guess is that during a long task the worker can't communicate then the scheduler see it as dead breaks the connection. I found a workaround by changing the number of retries in comm.retry.count - called in distributed.utils_comm - to 2. No more connection issues. Again I am sure it depends on your code / how long it runs / a task runs

Hope it helps.

python 3.6.9 dask 2.9.1 distributed 2.9.1 tornado 6.0.3

PS: maybe that has been fixed by a more recent version of distributed, but my company still runs w 2.9.1

That solves my problem. After setting both the distributed.comm.retry.count and distributed.comm.timeouts.connect to a larger value, the error disappears.

dask.config.set({"distributed.comm.retry.count": 10})
dask.config.set({"distributed.comm.timeouts.connect": 30})

In my case, it seems that it is the (SSD) disk that is having really high response time when out of cache, slowing down the worker, making longer response time for working, and causing the network problem.

I'm trying to calculate a huge distance matrix, about 5000*5000, which makes frequent writing to the disk for storing out-of-memeory data. If I switch to a smaller matrix calculation, which require less or no disk writing, the error disppears. So the above guess is really reasonable for me.

Update:

The error still shows up with the larger distance matrix with 100000*100000. But this time, this error is caused by another reason, which is the termination of worker by nanny as the worker memory exceeds 95%.

After setting the dask to not terminate worker, the error disappears.

dask.config.set({"distributed.worker.memory.terminate": False})

Ref: https://stackoverflow.com/questions/57997463/dask-warning-worker-exceeded-95-memory-budget

Material-Scientist commented 10 months ago

I'm also seeing this error happen frequently during I/O.

2024-01-10 09:15:58,926 - distributed.worker - WARNING - Compute Failed Key: read_csv-baf3bbcd-36c3-4895-8e91-22aa2a0403ea Function: read_csv args: (PosixPath('/root/SWFS/codebase/batch-stream-processing/nbs/../../../../remote/tardis/Tardis-BTCUSDT/2023-03/2023-03-20.csv')) kwargs: {'chunksize': 200000} Exception: "OSError(5, 'Input/output error')"
2024-01-10 08:15:20,969 - distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:44885 Traceback (most recent call last): File "/root/miniconda3/lib/python3.11/site-packages/tornado/iostream.py", line 861, in _read_to_buffer bytes_read = self.read_from_fd(buf) ^^^^^^^^^^^^^^^^^^^^^^ File "/root/miniconda3/lib/python3.11/site-packages/tornado/iostream.py", line 1116, in read_from_fd return self.socket.recv_into(buf, len(buf)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/root/miniconda3/lib/python3.11/site-packages/distributed/worker.py", line 2073, in gather_dep response = await get_data_from_worker( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/root/miniconda3/lib/python3.11/site-packages/distributed/worker.py", line 2866, in get_data_from_worker response = await send_recv( ^^^^^^^^^^^^^^^^ File "/root/miniconda3/lib/python3.11/site-packages/distributed/core.py", line 1115, in send_recv response = await comm.read(deserializers=deserializers) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/root/miniconda3/lib/python3.11/site-packages/distributed/comm/tcp.py", line 240, in read convert_stream_closed_error(self, e) File "/root/miniconda3/lib/python3.11/site-packages/distributed/comm/tcp.py", line 141, in convert_stream_closed_error raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc distributed.comm.core.CommClosedError: in <TCP (closed) Ephemeral Worker->Worker for gather local=tcp://127.0.0.1:43168 remote=tcp://127.0.0.1:44885>: ConnectionResetError: [Errno 104] Connection reset by peer

However, because I've set a few retries on each future, the code still runs eventually.

I'll check if setting comm retries & connection timeouts helps, as suggested by @46319943.

renjthmails commented 3 months ago

I am also facing same Comm closed issue with my setup in azure kubernetes environment whereas the same dask code works in databricks setup.

arkanoid87 commented 4 weeks ago

same issue here. Same packages on both machines (no version warnings when running dask worker), no docker involved (running in local pyenv environment). Both machines shares same python version and requirements.txt with pinned versions.

dask==2024.10.0

The two machines are connected to the same switch and iperf -p 10 [-R] shows a solid total ~985Mbps both directions. I can only run workers localhost with the scheduler, or I get:

distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x70632dda8490>: ConnectionRefusedError: [Errno 111] Connection refused

on the "remote" (~30cm) worker

UPDATE: trying downgrading dask version from 2024.10 to 2023.12.1 but I keep getting same errors

UPDATE 2: solved: my issue was related with the outbound interface ip communicated between workers. I've replaced all occurrences of 127.0.0.1 (on host with scheduler and first worker) with eth interface ip. Now I get no timeouts