PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.39k stars 1.64k forks source link

`dask_kubernetes` Restart feature is broken #5549

Closed BitTheByte closed 2 years ago

BitTheByte commented 2 years ago

It seems like something in prefect's codebase broke dask_kubernetes ability to restart and respawn worker pods, if for some reason the worker pod is killed the default behavior of dask_kubernetes is to restart this pod again however when used with prefect it does not, using the code below it will create dask's cluster for testing - try killing one or two of the pods -

from dask_kubernetes import KubeCluster, make_pod_spec

pod_spec = make_pod_spec(image='daskdev/dask:latest')
cluster = KubeCluster(pod_spec)
cluster.adapt(minimum=4, maximum=4)
input("pause ...")

Using prefect dask Kubernetes run - try killing worker pods they just die -

from dask_kubernetes import KubeCluster, make_pod_spec
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
from prefect.storage import Docker
from prefect import task, Flow
import prefect
import time

storage = Docker(
    base_image="prefecthq/prefect:1.0.0-python3.9",
    python_dependencies=[
        "dask-kubernetes==2021.3.1",
    ],
)

@task()
def transform(number: int)->int:
    time.sleep(30)

with Flow(
    "Dask Kubernetes Flow",
    storage=storage,
    executor=DaskExecutor(
        cluster_class=lambda: KubeCluster(make_pod_spec(image=prefect.context.image)),
        adapt_kwargs={"minimum": 4, "maximum": 4},
    ),
    run_config=KubernetesRun(),
) as flow:
    transform.map([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

Expected Behavior

I expect to see workers restarting and respawning again

BitTheByte commented 2 years ago

Hi @madkinsz

Should we prioritize this? Dask is Prefect's main feature to parallelize and distributes workloads. here are some of my notes

1) Scheduler pod itself doesn't have to ability to restart anything. restarting takes place on the initiator process in this case the prefect-job Kubernetes job

2) I think it has something to do with dask.py#L291 I think the event subscription blocks dask_kuberenetes event loop

3) Orion DaskTaskRunner task_runners.py#L206 is cleaner and doesn't suffer from the same issue however since Orion is not ready using Orion is not a suitable option

4) If possible can we get someone from dask-kubernetes to take a look? maybe @jacobtomlinson ?

BitTheByte commented 2 years ago

Hi @madkinsz

Good news. I found the cause of this dask.py#L253-L259 - confirming my secound point - changing these lines to:-

with Client(cluster, **self.client_kwargs) as client:
    self.client = client
    yield
    #with performance_report_context:
    #    self.client = client
    #    try:
    #        self._pre_start_yield()
    #        yield
    #    finally:
    #        self._post_start_yield()

This solves the problem and it seems like it was introduced with the performance report feature. really don't know how to reimplement this but for the time being, I think maybe we should toggle this option?

I'll be trying to narrow it further in the few next hours

BitTheByte commented 2 years ago

Found the root cause dask.py#L291 I think dask_kubernetes is using the same event subscription to restart workers so by writing to it here we caused this behavior this feature is only for logging workers status to console. I think it's not that important as the logging is not shown anywhere else except prefect-job k8s job stdout. an easy workaround is just removing the logging. I'll keep the final decision to you

BitTheByte commented 2 years ago

Hi @madkinsz

Do we have an update on this?

zanieb commented 2 years ago

Yeah, we can't just tear out event subscription, people are using it. I talked to the Dask team and can confirm that new subscribers to worker status will cause the existing subscriber to be disconnected. I might take the time to contribute a fix upstream, but I'd like to fix the behavior here first. I think the simplest fix may be adding a toggle to disable watching for worker events and add a warning if we see a user using KubeCluster.

jacobtomlinson commented 2 years ago

We are in the process of replacing KubeCluster with an operator that handles scaling separately from the cluster manager. So the new cluster manager shouldn't have this problem.

It does sound like this will affect any cluster manager that uses adaptive scaling. Upstream fixes would be much appreciated!