rapidsai / deployment

RAPIDS Deployment Documentation
https://docs.rapids.ai/deployment/stable/
9 stars 28 forks source link

Dask Cluster creation on ECS Cluster fails, `timed out` errors #343

Open skirui-source opened 6 months ago

skirui-source commented 6 months ago

How to reproduce:

Following the instructions, create the ECS cluster then create a dask cluster using the command:

AWS_REGION='us-west-2'

from dask_cloudprovider.aws import ECSCluster
cluster = ECSCluster(
    cluster_arn= "arn:aws:ecs:us-west-2:561241433344:cluster/skirui-test-2402a",
    n_workers=2, 
    worker_gpu=1,
    skip_cleanup=True,
    execution_role_arn=<exec role arn>,
    task_role_arn=<task role arn>  
    scheduler_timeout="20 minutes"
)
OUTPUT ```python --------------------------------------------------------------------------- CancelledError Traceback (most recent call last) File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/comm/tcp.py:490, in BaseTCPConnector.connect(self, address, deserialize, **connection_args) 489 else: --> 490 stream = await self.client.connect( 491 ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs 492 ) 494 # Under certain circumstances tornado will have a closed connection with an 495 # error and not raise a StreamClosedError. 496 # 497 # This occurs with tornado 5.x and openssl 1.1+ File ~/miniforge3/envs/dask/lib/python3.9/site-packages/tornado/tcpclient.py:279, in TCPClient.connect(self, host, port, af, ssl_options, max_buffer_size, source_ip, source_port, timeout) 270 connector = _Connector( 271 addrinfo, 272 functools.partial( (...) 277 ), 278 ) --> 279 af, addr, stream = await connector.start(connect_timeout=timeout) 280 # TODO: For better performance we could cache the (af, addr) 281 # information here and re-use it on subsequent connections to 282 # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2) CancelledError: During handling of the above exception, another exception occurred: CancelledError Traceback (most recent call last) File ~/miniforge3/envs/dask/lib/python3.9/asyncio/tasks.py:490, in wait_for(fut, timeout, loop) 489 try: --> 490 return fut.result() 491 except exceptions.CancelledError as exc: CancelledError: The above exception was the direct cause of the following exception: TimeoutError Traceback (most recent call last) File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/comm/core.py:342, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args) 341 try: --> 342 comm = await wait_for( 343 connector.connect(loc, deserialize=deserialize, **connection_args), 344 timeout=min(intermediate_cap, time_left()), 345 ) 346 break File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/utils.py:1940, in wait_for(fut, timeout) 1939 async def wait_for(fut: Awaitable[T], timeout: float) -> T: -> 1940 return await asyncio.wait_for(fut, timeout) File ~/miniforge3/envs/dask/lib/python3.9/asyncio/tasks.py:492, in wait_for(fut, timeout, loop) 491 except exceptions.CancelledError as exc: --> 492 raise exceptions.TimeoutError() from exc 493 finally: TimeoutError: The above exception was the direct cause of the following exception: OSError Traceback (most recent call last) File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/deploy/spec.py:331, in SpecCluster._start(self) 326 self.scheduler_comm = rpc( 327 getattr(self.scheduler, "external_address", None) 328 or self.scheduler.address, 329 connection_args=self.security.get_connection_args("client"), 330 ) --> 331 await super()._start() 332 except Exception as e: # pragma: no cover File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/deploy/cluster.py:133, in Cluster._start(self) 132 async def _start(self): --> 133 comm = await self.scheduler_comm.live_comm() 134 comm.name = "Cluster worker status" File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/core.py:1272, in rpc.live_comm(self) 1271 if not open or comm.closed(): -> 1272 comm = await connect( 1273 self.address, 1274 self.timeout, 1275 deserialize=self.deserialize, 1276 **self.connection_args, 1277 ) 1278 comm.name = "rpc" File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/comm/core.py:368, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args) 367 else: --> 368 raise OSError( 369 f"Timed out trying to connect to {addr} after {timeout} s" 370 ) from active_exception 372 local_info = { 373 **comm.handshake_info(), 374 **(handshake_overrides or {}), 375 } OSError: Timed out trying to connect to tcp://172.31.24.159:8786 after 30 s The above exception was the direct cause of the following exception: RuntimeError Traceback (most recent call last) Cell In[2], line 2 1 from dask_cloudprovider.aws import ECSCluster ----> 2 cluster = ECSCluster( 3 cluster_arn= "arn:aws:ecs:us-west-2:561241433344:cluster/skirui-test-2402a", 4 n_workers=2, 5 worker_gpu=1, 6 skip_cleanup=True, 7 execution_role_arn="arn:aws:iam::561241433344:role/rapids-cluster-execution-role", 8 task_role_arn="arn:aws:iam::561241433344:role/rapids-cluster-task-role", 9 scheduler_timeout="20 minutes" 10 ) File ~/home/skirui/dask-cloudprovider/dask_cloudprovider/aws/ecs.py:810, in ECSCluster.__init__(self, fargate_scheduler, fargate_workers, fargate_spot, image, scheduler_cpu, scheduler_mem, scheduler_port, scheduler_timeout, scheduler_extra_args, scheduler_task_definition_arn, scheduler_task_kwargs, scheduler_address, worker_cpu, worker_nthreads, worker_mem, worker_gpu, worker_extra_args, worker_task_definition_arn, worker_task_kwargs, n_workers, workers_name_start, workers_name_step, cluster_arn, cluster_name_template, execution_role_arn, task_role_arn, task_role_policies, cloudwatch_logs_group, cloudwatch_logs_stream_prefix, cloudwatch_logs_default_retention, vpc, subnets, security_groups, environment, tags, skip_cleanup, aws_access_key_id, aws_secret_access_key, region_name, platform_version, fargate_use_private_ip, mount_points, volumes, mount_volumes_on_scheduler, **kwargs) 808 self._lock = asyncio.Lock() 809 self.session = get_session() --> 810 super().__init__(**kwargs) File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/deploy/spec.py:284, in SpecCluster.__init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval) 282 if not self.called_from_running_loop: 283 self._loop_runner.start() --> 284 self.sync(self._start) 285 try: 286 self.sync(self._correct_state) File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/utils.py:358, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 356 return future 357 else: --> 358 return sync( 359 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 360 ) File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/utils.py:434, in sync(loop, func, callback_timeout, *args, **kwargs) 431 wait(10) 433 if error is not None: --> 434 raise error 435 else: 436 return result File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/utils.py:408, in sync..f() 406 awaitable = wait_for(awaitable, timeout) 407 future = asyncio.ensure_future(awaitable) --> 408 result = yield future 409 except Exception as exception: 410 error = exception File ~/miniforge3/envs/dask/lib/python3.9/site-packages/tornado/gen.py:767, in Runner.run(self) 765 try: 766 try: --> 767 value = future.result() 768 except Exception as e: 769 # Save the exception for later. It's important that 770 # gen.throw() not be called inside this try/except block 771 # because that makes sys.exc_info behave unexpectedly. 772 exc: Optional[Exception] = e File ~/home/skirui/dask-cloudprovider/dask_cloudprovider/aws/ecs.py:1011, in ECSCluster._start(self) 1003 self.scheduler = SchedulerAddress() 1005 with warn_on_duration( 1006 "10s", 1007 "Creating your cluster is taking a surprisingly long time. " 1008 "This is likely due to pending resources on AWS. " 1009 "Hang tight! ", 1010 ): -> 1011 await super()._start() File ~/miniforge3/envs/dask/lib/python3.9/site-packages/distributed/deploy/spec.py:335, in SpecCluster._start(self) 333 self.status = Status.failed 334 await self._close() --> 335 raise RuntimeError(f"Cluster failed to start: {e}") from e RuntimeError: Cluster failed to start: Timed out trying to connect to tcp://172.31.24.159:8786 after 30 s