dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 714 forks source link

FargateCluster timeout on exit #5447

Open rpanai opened 3 years ago

rpanai commented 3 years ago

When I close a Fargate cluster (similar to #220 ) using

client.close()
cluster.close()

I receive the following error

Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py", line 190, in adapt
    target = await self.safe_target()
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py", line 128, in safe_target
    n = await self.target()
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/deploy/adaptive.py", line 146, in target
    return await self.scheduler.adaptive_target(
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/core.py", line 789, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/core.py", line 747, in live_comm
    comm = await connect(
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/comm/core.py", line 307, in connect
    raise IOError(
OSError: Timed out trying to connect to tcp://3.87.54.191:8786 after 10 s

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py", line 204, in adapt
    if status != "down":
UnboundLocalError: local variable 'status' referenced before assignment
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7fbf8cc7f640>>, <Task finished name='Task-16550' coro=<AdaptiveCore.adapt() done, defined at /home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:178> exception=UnboundLocalError("local variable 'status' referenced before assignment")>)
Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/site-packages/distributed/comm/core.py", line 285, in connect
    comm = await asyncio.wait_for(
  File "/home/ec2-user/anaconda3/envs/features_r/lib/python3.8/asyncio/tasks.py", line 490, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
colt-jay commented 2 years ago

I am experiencing a very similar issue while using a Fargate ECS cluster in a context manager.

if __name__ == '__main__':
    with ECSCluster(fargate_scheduler=True, fargate_workers=True, n_workers=20, image=sys.argv[1],
                    task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess']) as cluster:
        print(cluster.dashboard_link)
        with Client(cluster) as client:
            main(client)
Traceback (most recent call last):
  File "/Users/colt/.pyenv/versions/3.9.6/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/Users/colt/.pyenv/versions/3.9.6/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/colt/Library/Caches/pypoetry/virtualenvs/dummy-sOqOco3g-py3.9/lib/python3.9/site-packages/distributed/deploy/cluster.py", line 214, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/Users/colt/Library/Caches/pypoetry/virtualenvs/dummy-sOqOco3g-py3.9/lib/python3.9/site-packages/distributed/utils.py", line 286, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
jacobtomlinson commented 2 years ago

Does this happen after your work completes?

colt-jay commented 2 years ago

Yes the work completes as expected. I'm also able to use the cluster and client in a notebook without issue until I call client.close() and cluster.close() (at which point I get the same error as the original issue outlines).

jacobtomlinson commented 2 years ago

Ok thanks. Given that all these tracebacks seem to point to distributed I'm going to move this issue over there.

lmarsden commented 2 years ago

Also see the related issue: https://github.com/dask/distributed/issues/4950

mkarbo commented 2 years ago

I am also having this issue, I have tested with 2021.12.0 and 2022.01.1 and python3.7/8/9.

However, worth noting, I got the same raises through a wrapper (Prefect) using dask-cloudproviders (fargatecluster).

My code runs, results complete, then upon exit of process the IOLoop is closed runtime error is raised in scheduler from utils.py

mkarbo commented 2 years ago

https://github.com/PrefectHQ/prefect/issues/5330 - here is a detail of issue which leads to IOLoop also, using prefect and fargate cluster.

Sorry to tag, just wondering if any updates regarding this potential issue in distributed is getting any attention? @jacobtomlinson

rpanai commented 1 year ago

Any updates here? This problems appears from scripts only but it works fine from jupyter/ipython.