dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
135 stars 110 forks source link

CancelledError in successfully created AWS ECS Fargate Cluster #427

Open Thodorissio opened 5 months ago

Thodorissio commented 5 months ago

Issue:

I am creating an AWS ECS Fargate Cluster using the Dask Cloudprovider library, following . Although the cluster is successfully created (status is active) and the workers are triggered, the dask operation fails providing CancelledError.

A similar error is occured even if I try different dask array operations (e.g. add operation).

Minimal Complete Verifiable Example:

import dask.array as da
import logging
from dask_cloudprovider.aws import FargateCluster
from distributed import Client

SCHEDULER_CPU = 1024
SCHEDULER_MEM = 4096
WORKER_CPU = 4096
WORKER_MEM = 16384
NUM_WORKERS = 4

logging.basicConfig(level=logging.INFO)

def main():
    cluster_name_template = "dask-fargate-test"

    logging.info("Creating Fargate cluster...")
    cluster = FargateCluster(
        cluster_name_template=cluster_name_template,
        n_workers=NUM_WORKERS,
        worker_cpu=WORKER_CPU,
        worker_mem=WORKER_MEM,
        scheduler_cpu=SCHEDULER_CPU,
        scheduler_mem=SCHEDULER_MEM,
    )
    logging.info("Fargate cluster created.")

    logging.info(f"Scheduler address: {cluster.scheduler_address}")
    # client = Client(address=cluster.scheduler_address, timeout="60s")
    client = Client(cluster)
    logging.info(f"Connected to Dask client: {client}")

    x = da.random.random((10000, 10000), chunks=(1000, 1000))
    y = x + x.T
    z = y[::2, 5000:].mean(axis=1).mean()
    res = z.compute()
    logging.info(f"Result: {res}")

if __name__ == "__main__":
    main()

Runtime logs

INFO:root:Creating Fargate cluster...
INFO:aiobotocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:aiobotocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
C:\Users\thodo\anaconda3\lib\contextlib.py:126: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources on AWS. Hang tight!
  next(self.gen)
INFO:root:Fargate cluster created.
INFO:root:Scheduler address: tcp://3.73.63.81:8786
INFO:numexpr.utils:NumExpr defaulting to 8 threads.
C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py:1393: VersionMismatchWarning: Mismatched versions found

+-------------+---------------+-----------------+-----------------+
| Package     | Client        | Scheduler       | Workers         |
+-------------+---------------+-----------------+-----------------+
| cloudpickle | 2.0.0         | 3.0.0           | 3.0.0           |
| dask        | 2024.5.0      | 2024.5.2        | 2024.5.2        |
| distributed | 2024.5.0      | 2024.5.2        | 2024.5.2        |
| lz4         | None          | 4.3.3           | 4.3.3           |
| msgpack     | 1.0.2         | 1.0.8           | 1.0.8           |
| pandas      | 1.3.4         | 2.2.2           | 2.2.2           |
| python      | 3.9.7.final.0 | 3.10.12.final.0 | 3.10.12.final.0 |
| toolz       | 0.11.1        | 0.12.0          | 0.12.0          |
| tornado     | 6.1           | 6.4             | 6.4             |
+-------------+---------------+-----------------+-----------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
INFO:root:Connected to Dask client: <Client: 'tcp://172.31.4.49:8786' processes=20 threads=80, memory=298.02 GiB>
Traceback (most recent call last):
  File "C:\Users\thodo\Documents\dask-fargate-demo\dask_fargate.py", line 42, in <module>
    main()
  File "C:\Users\thodo\Documents\dask-fargate-demo\dask_fargate.py", line 37, in main
    res = z.compute()
  File "C:\Users\thodo\anaconda3\lib\site-packages\dask\base.py", line 375, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "C:\Users\thodo\anaconda3\lib\site-packages\dask\base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 2233, in _gather
    raise exc
concurrent.futures._base.CancelledError: ('mean_agg-aggregate-d9d3bd3022a1c263a4fd8de53f2c280c',)

Additional Issue:

A similar CancelledError is triggered, when I try to reconnect to the scheduler address of the created cluster (after the initial connection when I created the cluster):

import logging
from distributed import Client

logging.basicConfig(level=logging.INFO)

def main():
    logging.info("Connecting to existing cluster address...")
    client = Client(address="tcp://3.73.63.81:8786", timeout="60s")
    logging.info(f"Connected to Dask client: {client}")

if __name__ == "__main__":
    main()

Output:

INFO:root:Connecting to existing cluster address...
Traceback (most recent call last):
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\comm\tcp.py", line 546, in connect
    stream = await self.client.connect(
  File "C:\Users\thodo\anaconda3\lib\site-packages\tornado\tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\thodo\anaconda3\lib\asyncio\tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\comm\core.py", line 342, in connect
    comm = await wait_for(
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\utils.py", line 1961, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "C:\Users\thodo\anaconda3\lib\asyncio\tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\thodo\Documents\dask-fargate-demo\conn_demo.py", line 14, in <module>
    main()
  File "C:\Users\thodo\Documents\dask-fargate-demo\conn_demo.py", line 9, in main
    client = Client(address="tcp://3.73.63.81:8786", timeout="60s")
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1017, in __init__
    self.start(timeout=timeout)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1219, in start
    sync(self.loop, self._start, **kwargs)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\utils.py", line 434, in sync
    raise error
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\utils.py", line 408, in f
    result = yield future
  File "C:\Users\thodo\anaconda3\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1298, in _start
    await self._ensure_connected(timeout=timeout)
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\client.py", line 1360, in _ensure_connected
    comm = await connect(
  File "C:\Users\thodo\anaconda3\lib\site-packages\distributed\comm\core.py", line 368, in connect
    raise OSError(
OSError: Timed out trying to connect to tcp://3.73.63.81:8786 after 60 s

Local Environment:

jacobtomlinson commented 5 months ago

It looks like the scheduler is possibly being killed somehow and so you're losing the connectiont to it. Can you look at the scheduler and worker logs to see if there are any more clues in there?