dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
130 stars 107 forks source link

Timeout error when connecting to scheduler with FargateCluster on 2022.10.0 #408

Open jgdwyer opened 1 year ago

jgdwyer commented 1 year ago

Describe the issue: After upgrading from dask-cloudprovider 2022.8.0 to 2022.10.0, I've been running into a sporadic issue when attempting to start a FargateCluster. The scheduler task will start but then I see the following error:

# redacted ip
RuntimeError: Cluster failed to start: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 30 s

I've noticed that the scheduler Fargate task starts, but quickly dies. Here are its logs:

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|   timestamp   |                                                                                             message                                                                                             |
|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1686169718457 | + '[' '' ']'                                                                                                                                                                                    |
| 1686169718457 | + '[' '' == true ']'                                                                                                                                                                            |
| 1686169718457 | + CONDA_BIN=/opt/conda/bin/conda                                                                                                                                                                |
| 1686169718457 | + '[' -e /opt/app/environment.yml ']'                                                                                                                                                           |
| 1686169718457 | + echo 'no environment.yml'                                                                                                                                                                     |
| 1686169718457 | + '[' '' ']'                                                                                                                                                                                    |
| 1686169718457 | + '[' '' ']'                                                                                                                                                                                    |
| 1686169718457 | + exec dask-scheduler --idle-timeout '10 minutes'                                                                                                                                               |
| 1686169718459 | no environment.yml                                                                                                                                                                              |
| 1686169719866 | /opt/conda/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py:140: FutureWarning: dask-scheduler is deprecated and will be removed in a future release; use `dask scheduler` instead |
| 1686169719866 |   warnings.warn(                                                                                                                                                                                |
| 1686169719866 | 2023-06-07 20:28:39,866 - distributed.scheduler - INFO - -----------------------------------------------                                                                                        |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

I never had this error when on dask-cloudprovider 2022.8.0. I've found that sometimes on 2022.10.0 I can get the client to connect to the scheduler, but it only works maybe 10% of the time.

I had thought that increasing the timeout could help, so I tried:

import dask
dask.config.set({"distributed.comm.timeouts.connect": '3600s'})
dask.config.set({"distributed.comm.timeouts.tcp": '3600s'})

But when I launch Fargate cluster I get the same error but with a different timeout. However, the new timeout is not respected - this error comes up after a few minutes, not an hour: OSError: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 3600 s.

Minimal Complete Verifiable Example:

from dask_cloudprovider.aws import FargateCluster
# some values partially redacted
cluster_kwargs = dict(
    cluster_arn="arn:aws:ecs:us-east-1:<ACCNT>:cluster/datascientist-dev",
    execution_role_arn="arn:aws:iam::<ACCNT>:role/datascientist/DataScienceEcsTaskExecutionRole",
    task_role_arn="arn:aws:iam::<ACCNT>:role/datascientist/DataScienceDaskRole",
    cloudwatch_logs_group="/datascientist/datascientist-dev",
    vpc="vpc-<VPC>",
    subnets=["subnet-<SUBNET>"],
    security_groups=["sg-<SG>"],
)
cluster = FargateCluster(
    fargate_use_private_ip=True,
    n_workers=2,
    # Cluster image name/tag are baked into the client image
    image=f"{os.getenv('CLUSTER_IMAGE')}",
    scheduler_timeout="10 minutes",
    skip_cleanup=True,
    **cluster_kwargs,
)

Anything else we need to know?:

I'm using the same code versions on the client and cluster side. Also, I can confirm that when keeping all of the other software libraries the same, but just downgrading to 2022.08.0 makes everything work smoothly.

Here's the full error traceback:

---------------------------------------------------------------------------
StreamClosedError                         Traceback (most recent call last)
File /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:225, in TCP.read(self, deserializers)
    224 try:
--> 225     frames_nbytes = await stream.read_bytes(fmt_size)
    226     (frames_nbytes,) = struct.unpack(fmt, frames_nbytes)

StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

CommClosedError                           Traceback (most recent call last)
File /opt/conda/lib/python3.8/site-packages/distributed/comm/core.py:373, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    370 try:
    371     # This would be better, but connections leak if worker is closed quickly
    372     # write, handshake = await asyncio.gather(comm.write(local_info), comm.read())
--> 373     handshake = await wait_for(comm.read(), time_left())
    374     await wait_for(comm.write(local_info), time_left())

File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:1878, in wait_for(fut, timeout)
   1877 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1878     return await asyncio.wait_for(fut, timeout)

File /opt/conda/lib/python3.8/asyncio/tasks.py:494, in wait_for(fut, timeout, loop)
    493 if fut.done():
--> 494     return fut.result()
    495 else:

File /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:241, in TCP.read(self, deserializers)
    240     if not sys.is_finalizing():
--> 241         convert_stream_closed_error(self, e)
    242 except BaseException:
    243     # Some OSError, CancelledError or a another "low-level" exception.
    244     # We do not really know what was already read from the underlying
    245     # socket, so it is not even safe to retry here using the same stream.
    246     # The only safe thing to do is to abort.
    247     # (See also GitHub #4133, #6548).

File /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:144, in convert_stream_closed_error(obj, exc)
    143 else:
--> 144     raise CommClosedError(f"in {obj}: {exc}") from exc

CommClosedError: in <TCP (closed)  local=tcp://172.18.0.2:59150 remote=tcp://xx.yy.zz.xyza:8786>: Stream is closed

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
File /opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py:331, in SpecCluster._start(self)
    326     self.scheduler_comm = rpc(
    327         getattr(self.scheduler, "external_address", None)
    328         or self.scheduler.address,
    329         connection_args=self.security.get_connection_args("client"),
    330     )
--> 331     await super()._start()
    332 except Exception as e:  # pragma: no cover

File /opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py:127, in Cluster._start(self)
    126 async def _start(self):
--> 127     comm = await self.scheduler_comm.live_comm()
    128     comm.name = "Cluster worker status"

File /opt/conda/lib/python3.8/site-packages/distributed/core.py:1235, in rpc.live_comm(self)
   1234 if not open or comm.closed():
-> 1235     comm = await connect(
   1236         self.address,
   1237         self.timeout,
   1238         deserialize=self.deserialize,
   1239         **self.connection_args,
   1240     )
   1241     comm.name = "rpc"

File /opt/conda/lib/python3.8/site-packages/distributed/comm/core.py:378, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    377         await comm.close()
--> 378     raise OSError(
    379         f"Timed out during handshake while connecting to {addr} after {timeout} s"
    380     ) from exc
    382 comm.remote_info = handshake

OSError: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 3600 s

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
Cell In[4], line 42
     34 tags = {
     35     "Environment": "DEVELOPMENT",
     36     "Purpose": "Experimenting",  # or "Ad-hoc"
     37     "Function": "Play",  # or "Test"
     38     "Initiative": "Other",
     39 }
     41 # This may take several minutes
---> 42 cluster = FargateCluster(
     43     fargate_use_private_ip=True,
     44     n_workers=2,
     45     # Cluster image name/tag are baked into the client image
     46     image=f"{os.getenv('CLUSTER_IMAGE')}",
     47     scheduler_timeout="10 minutes",
     48     # find_address_timeout=120,
     49     tags=tags,
     50     skip_cleanup=True,
     51     **cluster_kwargs,
     52 )

File /opt/conda/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py:1489, in FargateCluster.__init__(self, **kwargs)
   1488 def __init__(self, **kwargs):
-> 1489     super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)

File /opt/conda/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py:800, in ECSCluster.__init__(self, fargate_scheduler, fargate_workers, fargate_spot, image, scheduler_cpu, scheduler_mem, scheduler_port, scheduler_timeout, scheduler_extra_args, scheduler_task_definition_arn, scheduler_task_kwargs, scheduler_address, worker_cpu, worker_nthreads, worker_mem, worker_gpu, worker_extra_args, worker_task_definition_arn, worker_task_kwargs, n_workers, workers_name_start, workers_name_step, cluster_arn, cluster_name_template, execution_role_arn, task_role_arn, task_role_policies, cloudwatch_logs_group, cloudwatch_logs_stream_prefix, cloudwatch_logs_default_retention, vpc, subnets, security_groups, environment, tags, skip_cleanup, aws_access_key_id, aws_secret_access_key, region_name, platform_version, fargate_use_private_ip, mount_points, volumes, mount_volumes_on_scheduler, **kwargs)
    798 self._lock = asyncio.Lock()
    799 self.session = get_session()
--> 800 super().__init__(**kwargs)

File /opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py:291, in SpecCluster.__init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
    289 if not called_from_running_loop:
    290     self._loop_runner.start()
--> 291     self.sync(self._start)
    292     try:
    293         self.sync(self._correct_state)

File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    349     return future
    350 else:
--> 351     return sync(
    352         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    353     )

File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
    416 if error:
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:
    420     return result

File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:391, in sync.<locals>.f()
    389         future = wait_for(future, callback_timeout)
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:
    393     error = sys.exc_info()

File /opt/conda/lib/python3.8/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File /opt/conda/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py:999, in ECSCluster._start(self)
    991     self.scheduler = SchedulerAddress()
    993 with warn_on_duration(
    994     "10s",
    995     "Creating your cluster is taking a surprisingly long time. "
    996     "This is likely due to pending resources on AWS. "
    997     "Hang tight! ",
    998 ):
--> 999     await super()._start()

File /opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self)
    333 self.status = Status.failed
    334 await self._close()
--> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e

RuntimeError: Cluster failed to start: Timed out during handshake while connecting to tcp://xx.yy.zz.xyza:8786 after 3600 s

Environment:

jacobtomlinson commented 1 year ago

@tamas4sunairio @pwerth do either of you have some time to look into this?

jgdwyer commented 1 year ago

@jacobtomlinson @tamas4sunairio @pwerth If you have any suggestions for adding logging/debugging, I can re-run and provide more details.