dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

distributed.comm.core.CommClosedError loop failing to Terminate worker pods #3488

Open scottyhq opened 4 years ago

scottyhq commented 4 years ago

We recently encountered an issue on binderhub where a dask pod failed to terminate, resulting in a node running for hours:

[ec2-user@ip-192-168-60-131 ~]$ kubectl get pods -A
NAMESPACE        NAME                                                            READY   STATUS      RESTARTS   AGE
binder-staging   autohttps-77465ddf6f-8mgcn                                      2/2     Running     0          9d
binder-staging   binder-58b4fbccd5-7r2qz                                         1/1     Running     0          9d
binder-staging   binder-staging-dind-fppcs                                       1/1     Running     0          9d
binder-staging   binder-staging-image-cleaner-lmg2n                              2/2     Running     0          9d
binder-staging   binder-staging-kube-lego-84c9c589cf-zwjzc                       1/1     Running     0          9d
binder-staging   binder-staging-nginx-ingress-controller-84464555db-psrdm        1/1     Running     0          9d
binder-staging   binder-staging-nginx-ingress-controller-84464555db-st8np        1/1     Running     0          9d
binder-staging   binder-staging-nginx-ingress-default-backend-6cd8b44c86-scn29   1/1     Running     0          9d
binder-staging   dask-cgentemann-osm2020tutorial-9fm90iqn-d58bc39d-a2v8wz        0/1     Completed   0          3h24m
binder-staging   dask-cgentemann-osm2020tutorial-9fm90iqn-d58bc39d-a6sfbh        1/1     Running     0          3h24m
binder-staging   dask-cgentemann-osm2020tutorial-9fm90iqn-d58bc39d-a8s7x5        0/1     Completed   0          3h24m
binder-staging   dask-cgentemann-osm2020tutorial-9fm90iqn-d58bc39d-a9csxq        0/1     Completed   0          3h24m
binder-staging   dask-cgentemann-osm2020tutorial-9fm90iqn-d58bc39d-ajsm8h        0/1     Completed   0          3h24m
binder-staging   dask-cgentemann-osm2020tutorial-9fm90iqn-d58bc39d-aldrmw        0/1     Completed   0          3h24m
binder-staging   dask-cgentemann-osm2020tutorial-9fm90iqn-d58bc39d-avv8qw        0/1     Completed   0          3h24m
binder-staging   hub-57b965856b-9nfc8                                            1/1     Running     3          9d
binder-staging   proxy-67f46bb5d-9khxh                                           1/1     Running     0          9d
binder-staging   user-scheduler-6589468f65-5ntcq                                 1/1     Running     0          9d
binder-staging   user-scheduler-6589468f65-ml4dt                                 1/1     Running     0          9d
kube-system      aws-node-mv8p4                                                  1/1     Running     0          9d
kube-system      aws-node-w2hg5                                                  1/1     Running     0          3h23m
kube-system      cluster-autoscaler-78fb96cfd5-hp2pd                             1/1     Running     0          9d
kube-system      coredns-74d48d5d5b-fgnk5                                        1/1     Running     0          9d
kube-system      coredns-74d48d5d5b-gqqlr                                        1/1     Running     0          9d
kube-system      kube-proxy-k4nfj                                                1/1     Running     0          3h23m
kube-system      kube-proxy-sh5dl                                                1/1     Running     0          9d

The pod listed as Running had a log showing an infinite loop of CommClousedErrors:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py", line 867, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}),
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/batched.py", line 117, in send
    raise CommClosedError
distributed.comm.core.CommClosedError
tornado.application - ERROR - Exception in callback <function Worker._register_with_scheduler.<locals>.<lambda> at 0x7f972cbad7a0>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py", line 867, in <lambda>
    lambda: self.batched_stream.send({"op": "keep-alive"}),
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/batched.py", line 117, in send
    raise CommClosedError
distributed.comm.core.CommClosedError

And the pods listed as Completed had the following traceback in their logs:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py", line 1941, in gather_dep
    self.rpc, deps, worker, who=self.address
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py", line 3195, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py", line 3182, in _get_data
    max_connections=max_connections,
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
    convert_stream_closed_error(self, e)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 121, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fe06bd55590>>, <Task finished coro=<Worker.heartbeat() done, defined at /srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py:881> exception=CommClosedError('in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer')>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py", line 918, in heartbeat
    raise e
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/worker.py", line 891, in heartbeat
    metrics=await self.get_metrics(),
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
    convert_stream_closed_error(self, e)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 121, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer

pinging @jhamman - not sure this has come up on the binderhub running on GCE

Versions:

dask                      2.10.1                     py_0    conda-forge
dask-core                 2.10.1                     py_0    conda-forge
dask-gateway              0.6.1                    py37_0    conda-forge
dask-kubernetes           0.10.1                     py_0    conda-forge
dask-labextension         1.1.0                      py_0    conda-forge
distributed               2.10.0                     py_0    conda-forge
jhamman commented 4 years ago

pinging @jhamman - not sure this has come up on the binderhub running on GCE

(un)Fortunately, I have not seen this on our GCP binderhub.

jacobtomlinson commented 4 years ago

Thanks for raising this.

Looking at the tracebacks there seems to be nothing related to dask-kubernetes in there. It seems like the distributed cluster itself is failing to exit cleanly. Therefore I'm going to move this over to the distributed repo and track it there.

mrocklin commented 4 years ago

I don't know what would cause the CommClosedErrors. If I were to dive in here I would just start looking at the lines mentioned in the traceback, and maybe try to back out what happens if a connection dies at that point.

Were your workers running with any sort of --death-timeout value that wasn't respected?

scottyhq commented 4 years ago

Thanks @jacobtomlinson and @mrocklin for looking into this. Our dask_config.yaml currently looks like this: https://github.com/pangeo-data/pangeo-stacks/blob/master/base-notebook/binder/dask_config.yaml

distributed:
  version: 2

  dashboard:
    link: /user/{JUPYTERHUB_USER}/proxy/{port}/status

  scheduler:
    idle-timeout: 3600s

# uncomment to force new worker pods after 2 hrs
#  worker:
#    lifetime:
#      duration: "2 hours"
#      stagger: "10 s"
#      restart: true

  admin:
    tick:
      limit: 5s

logging:
  distributed: warning
  bokeh: critical
  # http://stackoverflow.com/questions/21234772/python-tornado-disable-logging-to-stderr
  tornado: critical
  tornado.application: error

kubernetes:
  name: dask-{JUPYTERHUB_USER}-{uuid}
  worker-template:
    spec:
      serviceAccount: daskkubernetes
      restartPolicy: Never
      containers:
        - name: dask-worker
          image: ${JUPYTER_IMAGE_SPEC}
          args:
            - dask-worker
            - --nthreads
            - '2'
            - --no-dashboard
            - --memory-limit
            - 7GB
            - --death-timeout
            - '60'
          resources:
            limits:
              cpu: "1.75"
              memory: 7G
            requests:
              cpu: 1
              memory: 7G

labextension:
  factory:
    module: dask_kubernetes
    class: KubeCluster
    args: []
    kwargs: {}

So --death-timeout '60' (I'm guessing this is seconds), is not being respected. I see also that we have worker timeout config under distributed currently commented, which was discussed previously here https://github.com/pangeo-data/pangeo-stacks/pull/93. I suppose we could set that to a high value (something like 24 hours) just to ensure things don't run unintentionally for days when situations like this arise?

jacobtomlinson commented 4 years ago

The pod showing as Running is that one that concerns me. Any that show as Completed will not be taking up any resource on the cluster.

I think the CommClosedError exceptions we are seeing are unrelated. The worker has lost connection to the scheduler but is still trying to send keep alive messages which are failing. I've raised #3493 to try to resolve this.

The issue we are seeing here seems to be related to this loop in _register_with_scheduler.

https://github.com/dask/distributed/blob/cc7ecdf2abb76a7ed21d0cb4c9a9c92559f638c4/distributed/worker.py#L805-L852

The death timeout works using asyncio.wait_for, which can be foiled by blocking sync code.

https://github.com/dask/distributed/blob/cc7ecdf2abb76a7ed21d0cb4c9a9c92559f638c4/distributed/nanny.py#L310-L322

I suspect there is something in the _register_with_scheduler method which is blocking.

2d1r commented 4 years ago

Hello, I am quite new to dask but I am trying to launch some workers on a SLURM cluser, using dask distributed and I get pretty much the same errors the my logs, while the workers continue to run even if my script was killed.

Was this issue fixed somehow or is there a workaround to this? Thanks for your suggestions!

Here is the log of one of my workers ... Let me know if you need more information ...

distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Registered to: tcp://192.168.104.12:35330 distributed.worker - INFO - ------------------------------------------------- distributed.core - INFO - Starting established connection distributed.worker - INFO - Connection to scheduler broken. Reconnecting... distributed.worker - WARNING - Heartbeat to scheduler failed tornado.application - ERROR - Exception in callback <function Worker._register_with_scheduler.. at 0x2b468051f6a8> Traceback (most recent call last): File "/opt/ebsofts/bokeh/1.3.4-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run return self.callback() File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/worker.py", line 850, in lambda: self.batched_stream.send({"op": "keep-alive"}), File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/batched.py", line 117, in send raise CommClosedError distributed.comm.core.CommClosedError tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2b464fa8f9e8>>, <Task finished coro=<Worker.heartbeat() done, defined at /opt/ebsofts/dask/2.3.$ Traceback (most recent call last): File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect quiet_exceptions=EnvironmentError, tornado.util.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/opt/ebsofts/bokeh/1.3.4-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback ret = callback() File "/opt/ebsofts/bokeh/1.3.4-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result future.result() File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/worker.py", line 868, in heartbeat metrics=await self.get_metrics(), File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/core.py", line 747, in send_recv_from_rpc comm = await self.pool.connect(self.addr) File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/core.py", line 874, in connect connection_args=self.connection_args, File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/comm/core.py", line 227, in connect _raise(error) File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in _raise raise IOError(msg) OSError: Timed out trying to connect to 'tcp://192.168.104.12:35330' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x2b4680ce8b38>: ConnectionRefusedError: [Errno 111] Connection refused tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2b464fa8f9e8>>, <Task finished coro=<Worker.heartbeat() done, defined at /opt/ebsofts/dask/2.3.$ Traceback (most recent call last): File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect quiet_exceptions=EnvironmentError, tornado.util.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/opt/ebsofts/bokeh/1.3.4-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback ret = callback() File "/opt/ebsofts/bokeh/1.3.4-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result future.result() File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/worker.py", line 868, in heartbeat metrics=await self.get_metrics(), File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/core.py", line 747, in send_recv_from_rpc comm = await self.pool.connect(self.addr) File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/core.py", line 874, in connect connection_args=self.connection_args, File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/comm/core.py", line 227, in connect _raise(error) File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/comm/core.py", line 204, in _raise raise IOError(msg) OSError: Timed out trying to connect to 'tcp://192.168.104.12:35330' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x2b4680ce86a0>: ConnectionRefusedError: [Errno 111] Connection refused tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2b464fa8f9e8>>, <Task finished coro=<Worker.heartbeat() done, defined at /opt/ebsofts/dask/2.3.$ Traceback (most recent call last): File "/opt/ebsofts/dask/2.3.0-foss-2019a-Python-3.7.2/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect quiet_exceptions=EnvironmentError, tornado.util.TimeoutError: Timeout

jacobtomlinson commented 4 years ago

Hey @2d1r. While similar I think this is a different issue. My initial guess would be that your scheduler is being lost or killed somehow.

I recommend you raise a new issue with this problem and share more information on how you are constructing your cluster. Preferably with code example.