dask / dask-kubernetes

Native Kubernetes integration for Dask
https://kubernetes.dask.org
BSD 3-Clause "New" or "Revised" License
312 stars 148 forks source link

Cannot close cluster after interrupt (ctrl-c) #444

Closed cagantomer closed 6 months ago

cagantomer commented 2 years ago

What happened:

I am using KubeCluster for running ad-hoc dask cluster as part of my script.

I am trying to gracefully shut down the cluster in case there was an interrupt (SIGINT / ctrl-C, SIGKILL) with the example below.

When the scripts runs without interrupt, it will correctly clean up the cluster created. When interrupt it, with ctrl-c, the handler catches the exception, it but calling client.cluster.close in the finally clause raises a timeout error (see below). The workers and scheduler pods remain active and not terminated.

What you expected to happen: The pods of the ad-hoc cluster should terminate and there should not be any exception.

Minimal Complete Verifiable Example:

from pydoc import cli
import time
import signal
import random

from dask_kubernetes import KubeCluster, make_pod_spec
from dask.distributed import Client, wait

def get_client() -> Client:
    """Get a (dask) client for remote"""

    num_workers = 5
    image = "daskdev/dask:2022.3.0-py3.9"

    pod_spec = make_pod_spec(
        image=image,
        memory_request="4G",
        cpu_request=1,
        extra_pod_config={"nodeSelector": {"env": "sw"}},
    )

    name = f"dask-k8s-example-{{uuid}}"        

    cluster = KubeCluster(
        pod_spec, namespace="research", name=name, n_workers=num_workers
    )

    print(
        f"started KubeCluster - dashboard address is {cluster.dashboard_link}, scheduler address: {cluster.scheduler.address}"
    )

    client = Client(cluster)

    # TODO: add timeout (and maybe make this optional)
    client.wait_for_workers(num_workers)

    return client

def double(value: int):
    time.sleep(random.random() * 3)
    return value + value

def square(value: int):
    time.sleep(random.random() * 4)
    return value * value

if __name__ == "__main__":
    client = get_client()

    def handle_signal(sig, frame):
        print(f"got a signal {sig} with frame {frame}")
        raise Exception("handled ctrl-c")

    # signal.signal(signal.SIGINT, handle_signal)

    try:
        tasks = []
        for i in range(20):
            ipi = client.submit(double, i)
            isq = client.submit(square, ipi)
            tasks.append(isq)

        print("waiting for tasks to finish...")
        wait(tasks)

        for task in tasks:
            if task.status == "error":
                print("error in task", task)
            else:
                print(f"task {task}. results {task.result()}")
    except KeyboardInterrupt:
        print("ctrl-c (through KeyboardInterrupt)")
    except Exception as ex:
        print(ex)
    finally:
        print("closing...") 
        client.cluster.close()
        client.close()

The output from execution:

python bin/test_dask_kubecluster.py Creating scheduler pod on cluster. This may take some time. /Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/client.py:1283: VersionMismatchWarning: Mismatched versions found

+---------+--------+-----------+---------+ | Package | client | scheduler | workers | +---------+--------+-----------+---------+ | blosc | 1.10.6 | 1.10.2 | None | | lz4 | None | 3.1.10 | None | +---------+--------+-----------+---------+ warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"])) started KubeCluster - dashboard address is http://localhost:8787/status, scheduler address: tcp://dask-k8s-example-f47db833-b.research:8786 waiting for tasks to finish... ^Cctrl-c (through KeyboardInterrupt) closing... 2022-04-27 09:13:57,071 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x110361be0>>, <Task finished name='Task-165' coro=<SpecCluster._correct_state_internal() done, defined at /Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py:314> exception=OSError('Timed out trying to connect to tcp://localhost:56654 after 30 s')>) Traceback (most recent call last): File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect stream = await self.client.connect( File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect addrinfo = await self.resolver.resolve(host, port, af) asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for return fut.result() asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect comm = await asyncio.wait_for( File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for raise exceptions.TimeoutError() from exc asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback ret = callback() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result future.result() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close await self._correct_state() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal await self.scheduler_comm.retire_workers(workers=list(to_close)) File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc comm = await self.live_comm() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm comm = await connect( File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect raise OSError( OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s Traceback (most recent call last): File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect stream = await self.client.connect( File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect addrinfo = await self.resolver.resolve(host, port, af) asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for return fut.result() asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect comm = await asyncio.wait_for( File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for raise exceptions.TimeoutError() from exc asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/Users/tomercagan/dev/nextresearch/bin/test_dask_kubecluster.py", line 79, in client.cluster.close() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/cluster.py", line 193, in close return self.sync(self._close, callback_timeout=timeout) File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 309, in sync return sync( File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 376, in sync raise exc.with_traceback(tb) File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 349, in f result = yield future File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback ret = callback() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result future.result() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close await self._correct_state() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal await self.scheduler_comm.retire_workers(workers=list(to_close)) File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc comm = await self.live_comm() File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm comm = await connect( File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect raise OSError( OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s 2022-04-27 09:13:57,171 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client

Anything else we need to know?:

Click ctrl-c once "waiting for tasks to finish..." appears on the screen.

Also, not related, but the dashboard_link always shows localhost instead of something more useful (I recall reading a discussion about it - but it was a while ago).

Environment:

cagantomer commented 2 years ago

I can try to work on it though not sure I know enough yet. Is there a "development" guide that I can use to get up and running?

cagantomer commented 2 years ago

BTW, as a workaround, I am using the following to "manually" find and delete the nodes:

import pathlib
from kubernetes import client as k8sClient, config

DASK_CLUSTER_NAME_LABEL = "dask.org/cluster-name"
DASK_COMPONENT_LABEL = "dask.org/component"

def load_k8s_config():
    """load config - either from .kube in home directory (for local clients) or in-cluster (for e.g. CI)"""
    if (pathlib.Path.home() / ".kube" / "config").exists():
        config.load_kube_config()
    else:
        config.load_incluster_config()

def delete_adhoc_cluster(cluster_name: str, namespace: str):
    """delete the dask cluster created by KubeCluster"""
    load_k8s_config()
    v1 = k8sClient.CoreV1Api()

    pods_to_delete = []
    pods_list = v1.list_namespaced_pod(namespace=namespace)
    for pod in pods_list.items:
        if (
            DASK_CLUSTER_NAME_LABEL in pod.metadata.labels
            and pod.metadata.labels[DASK_CLUSTER_NAME_LABEL] == cluster_name
        ):
            pods_to_delete.append(
                dict(
                    name=pod.metadata.name, 
                    component=pod.metadata.labels.get(DASK_COMPONENT_LABEL, "n/a")
                )
            )

    for pod in pods_to_delete:
        print(f"deleting {pod['name']} ({pod['component']})...")
        v1.delete_namespaced_pod(pod['name'], "research")
jacobtomlinson commented 2 years ago

Apologies looks like I had typed a response here but forgot to press the button, nice that GitHub saves these things!

Thanks @cagantomer I am able to reproduce this locally. My guess from looking at the traceback is that the cluster manager is unable to connect to the scheduler after things start closing out. Perhaps it's worth exploring the port forwarding code in case it has been closed.

You might also be interested in trying the operator we've been working on which is intended to supersede the current KubeCluster implementation. One of the goals of the operator is to handle lifecycle things like this in a more k8s native way.

cagantomer commented 2 years ago

I saw the operator efforts and I kind of have mixed feeling towards it. I am reluctant regarding anything that requires extra installations :-)

I really liked the KubeCluster experience - just having the right permissions to k8s and everything works relatively smoothly...

What are the advantages of using the operator?

jacobtomlinson commented 2 years ago

I am reluctant regarding anything that requires extra installations :-)

That's totally understandable. I'm optimistic that the couple of extra lines you need to run the first time you use it will be worth it.

just having the right permissions to k8s and everything works relatively smoothly...

We often find that this is rare and folks don't have the right permissions. Particularly creating pods as that can be a large security risk.

What are the advantages of using the operator?

I'll try to enumerate some of the goals:

Switching to Custom Resource Definitions and an Operator hugely simplifies the client-side code and provides much better decoupling. This will help maintainability as the current KubeCluster has grown unweildy.

In multi-user clusters an admin can install the operator once for the cluster and then user's can just happily create Dask clusters.

With the current KubeCluster all state is tied to the instance of the object in Python, which means clean-up or reuse can be unpleasant if the Python process ends unexpectedly. In the new one state lives in k8s and clusters can be created/scaled/deleted in a k8s native way using kubectl in addition to the new experimental KubeCluster.

Many folks use Dask clusters in a multi-stage pipeline, where each stage is a separate Python script or notebook. The current KubeCluster cannot persist between stages, but clusters created with the operator can.

We are also in the process of adding a DaskJob CRD which will behave much like a k8s Job resource but with a Dask cluster attached. This should feel familiar to Kubeflow users who use the various training operators it provides.

The decoupled state means we have been able to implement support for dask-ctl so that clusters can be managed via the CLI/API that provides along with the new CLI dashboard (once we get that finished).

We currently struggle with supporting clusters that have Istio installed due to the way our pods talk directly to each other. The operator aims to solve that by handling the complex services required in a way that is transparent to the user.

The autoscaling logic is moved from the cluster manager to the operator which will also benefit multi-stage pipeline workflows.

cagantomer commented 2 years ago

@jacobtomlinson - thanks for the information. Once we are stable with our current implementation, I'll look into trying the operator (with hope the DevOps will help with it 😄)

jacobtomlinson commented 6 months ago

The classic KubeCluster was removed in https://github.com/dask/dask-kubernetes/pull/890. All users will need to migrate to the Dask Operator. Closing.