dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
134 stars 110 forks source link

raise RuntimeError("IOLoop is closed") during script shutdown #420

Open afsangujarati93 opened 10 months ago

afsangujarati93 commented 10 months ago

Describe the issue: With no success online, I tried to reduce my code to find exactly where the issue is coming from and this is the snippet that I am left with that's causing this error

raise RuntimeError("IOLoop is closed")

All my function is doing is setting up a cluster in ECS and tearing it down. The cluster successfully spins up in AWS ECS. The error occurs at the very end when the notebook is shutting down. Stack trace and other details in the thread. (edited)

Minimal Complete Verifiable Example:

from dotenv import load_dotenv
import os
import logging
from dask_cloudprovider.aws import ECSCluster
from dask.distributed import Client

load_dotenv(verbose=True, override=True)

AWS_ACCOUNT_ID=os.environ["AWS_ACCOUNT_ID"]
AWS_DEFAULT_REGION=os.environ["AWS_DEFAULT_REGION"]
VPC_ID = os.environ["VPC_ID"]
SUBNET_ID = os.environ["SUBNET_ID"]
SECURITY_GROUP = os.environ["SECURITY_GROUP"]

logging.basicConfig(level=logging.INFO)

logging.info("Initializing clusters ...")
with ECSCluster(
    cluster_name_template="dask-datascience",
    fargate_scheduler=True,
    fargate_workers=True,
    fargate_spot=True,
    vpc=VPC_ID,
    subnets=[SUBNET_ID],
    security_groups=[SECURITY_GROUP],
    worker_cpu=512,
    worker_mem=1024,
    n_workers=1
) as cluster:
    with Client(cluster) as client:
        logging.info("Going ....")

Anything else we need to know?:

Stacktrace

INFO:root:Going ....
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed

Package versions:

distributed==2023.12.1
dask==2023.12.1
dask_cloudprovider==2022.10.0
tornado==6.3.3

Environment:

afsangujarati93 commented 10 months ago

I tried using asynchronous with the following code

from dotenv import load_dotenv
import os
import logging
from dask_cloudprovider.aws import ECSCluster
from dask.distributed import Client

load_dotenv(verbose=True, override=True)

AWS_ACCOUNT_ID=os.environ["AWS_ACCOUNT_ID"]
AWS_DEFAULT_REGION=os.environ["AWS_DEFAULT_REGION"]
VPC_ID = os.environ["VPC_ID"]
SUBNET_ID = os.environ["SUBNET_ID"]
SECURITY_GROUP = os.environ["SECURITY_GROUP"]

logging.basicConfig(level=logging.INFO)

async def f():
    logging.info("Initializing clusters ...")
    cluster = await ECSCluster(
        cluster_name_template="dask-datascience",
        fargate_scheduler=True,
        fargate_workers=True,
        fargate_spot=True,
        vpc=VPC_ID,
        subnets=[SUBNET_ID],
        security_groups=[SECURITY_GROUP],
        worker_cpu=512,
        worker_mem=1024,
        n_workers=1,
        asynchronous=True
    )
    logging.info("in cluster ...")
    client = await Client(cluster, asynchronous=True)
    logging.info("initialized client ...")
    await client.close()
    logging.info("closed client ...")
    await cluster.close()
    logging.info("closed cluster ...")

    return None

# Or use asyncio
import asyncio

loop = asyncio.get_event_loop()
loop.run_until_complete(f())
logging.info("After complete ...")

import threading

# After your asyncio loop
for thread in threading.enumerate():
    print(f"Thread: {thread.name}, Daemon: {thread.isDaemon()}")

logging.info("Before exit ...")

os._exit(0) #Added this because the script won't exit 

In this situation, after looking at the logs, I noticed that the f() was getting executed successfully but the code wasn't exiting. When I looked at the output of print(f"Thread: {thread.name}, Daemon: {thread.isDaemon()}") I noticed the following

Thread: MainThread, Daemon: False
Thread: asyncio_0, Daemon: False

There was another asyncio_0 thread that was probably spun up somewhere in Dask or dask_provider that wasn't closed. As a temporary measure, I explicitly added os._exit(0) to exit the script. I know this isn't the ideal solution but I didn't know what else to look for.