PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.4k stars 1.59k forks source link

Cascading cancellation should work for normal subflows, not just sub-deployments. #12714

Open taylor-curran opened 6 months ago

taylor-curran commented 6 months ago

First check

Desired Behavior

When a parent flow is canceled, the child flow should also enter into a cancelling or canceled state.

For normal (not-deployed) subflows, the subflow does not cancel it crashes -- its the same behavior as a task that is running mid-cancellation of its flow.

For sub-deployments, the behavior is as expected, the subflow also enters into cancelling and then canceled state. ✅ This is likely because in the parent flow, I can return the state of the child flow. I think we test for this behavior.

@flow
def my_parent_flow_with_sd_k8s():
    time.sleep(5)
    a = run_deployment("my-deployed-subflow-k8s/cancel-repro-k8s-sf-0")

    return a.state.result()

Reasoning and Work-Arounds

Normal subflows are often used by teams using our databricks or dbt integrations. They often want cancellation of a parent flow to result in a cancellation of their databricks or dbt subflow. Its very common to have on_cancellation state change hooks that tear down resources in said databricks environments and the fact that these subflows don't inherit the state of their parent blocks them from building such automations.

A workaround is to set up on_crashed state change hooks, but crashes can represent different occurrences that may not warrant the tearing down of other resources in the same way that a cancellation would.

Reproduction

I made sure to set enhanced cancellation to True.

prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True

normal_subflows_served.py

from prefect import flow
import time

@flow
def my_subflow_serve():
    time.sleep(30)
    return 42

@flow
def my_parent_flow_serve():
    time.sleep(5)
    a = my_subflow_serve()
    time.sleep(10)
    return a

if __name__ == "__main__":
    my_parent_flow_serve.serve(
        name="cancel-repro-served-0", tags=["subflow-cancellation"]
    )

normal_subflows_deployed_parent.py

import time

from prefect import flow

@flow
def my_subflow_normal():
    time.sleep(30)
    return 42

@flow
def my_parent_flow_k8s():
    time.sleep(5)
    a = my_subflow_normal()
    time.sleep(10)
    return a

if __name__ == "__main__":
    my_parent_flow_k8s.deploy(
        name="cancel-repro-k8s-normal-0",
        work_pool_name="k8s-pool-2",
        image="my-parent-flow-k8s:latest",
        push=False,
        tags=["subflow-cancellation"]
    )

`sub_deployment.py'

import time
from prefect import flow
from prefect.deployments import run_deployment

@flow
def my_deployed_subflow_k8s():
    time.sleep(30)
    return 42

@flow
def my_parent_flow_with_sd_k8s():
    time.sleep(5)
    a = run_deployment("my-deployed-subflow-k8s/cancel-repro-k8s-sf-0")

    return a.state.result()

if __name__ == "__main__":
    my_deployed_subflow_k8s.deploy(
        name="cancel-repro-k8s-sf-0",
        work_pool_name="k8s-pool-2",
        image="my-deployed-subflow-k8s-with-sd:latest",
        push=False,
        tags=["subflow-cancellation"]
    )

    my_parent_flow_with_sd_k8s.deploy(
        name="cancel-repro-k8s-parent-with-sd-0",
        work_pool_name="k8s-pool-2",
        image="my-parent-flow-with-sd-k8s:latest",
        push=False,
        tags=["subflow-cancellation"]
    )

Error

Served Flow Run image

Deployed Flow Run with Normal Subflow image

Sub Deployment -- Expected Behavior ✅ image

Versions

Version:             2.17.1
API version:         0.8.4
Python version:      3.12.2
Git commit:          d6bdb075
Built:               Thu, Apr 11, 2024 6:58 PM
OS/Arch:             darwin/arm64
Profile:             cm-demo
Server type:         cloud

Additional context

Potentially related #10947, #11055 -- though maybe these fixes only apply to sub deployments it seems where there is a separate infra for the child flow.

desertaxle commented 6 months ago

Thanks for the issue @taylor-curran! This is likely a bug in our cancellation cleanup service. We will investigate a fix for this!

DGolubets commented 1 month ago

I'm writing a flow for running Spark in k8s and I need this to correctly tear it down on cancellation. Additionally it would be nice if tasks has cancellation hooks too.. But at least sub flows should have it working.

DGolubets commented 3 weeks ago

Any update on this? At the moment no sub-flow hooks are triggered at all, when parent flow gets cancelled.