PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.44k stars 1.59k forks source link

Support subflow cancellation if parent flow is in terminal state #12864

Open j-tr opened 6 months ago

j-tr commented 6 months ago

First check

Prefect Version

2.x

Describe the current behavior

When starting subflow runs as deployments asynchronously (via run_deployment with timeout=0) subflow runs can outlive their parent flow run. While this is useful in certain scenarios, it makes the cancellation of these subflow runs difficult. If the parent flow run is in a terminal state, subflow runs can only be canceled one by one, which is a tedious task.

Describe the proposed behavior

All flows runs that have subflow runs could offer a "Cancel subflows" button similar to the current "Cancel" button, which would be available as long as there are any subflows in cancellable states, irrespective of the parent flows state.

Alternatively, run_deployment could have an optional parameter "cancel_on_final_parent" that would cause the created deployment to be canceled as soon as the parent flow run enters a terminal state.

Example Use

Some of our flows start a large amount of subflows via run_deployment. In order to do this concurrently, we are using timeout=0 and later on poll the Prefect API for the status of the flow runs.

If the parent flow run runs into an unexpected exception, it will go into status "Failed" but the subflow runs will keep running. In this case, we often would like to cancel all subflow runs. Currently, we need to cancel each subflow run individually.

from prefect import flow
from prefect.deployments import run_deployment
import time

@flow
def main(
    run_forever: bool = False
):
    if run_forever:
        while True:
            time.sleep(1)
    else:
        subflow_runs = []
        for _ in range(3):
            subflow_run = run_deployment(name="main/test-flow", parameters={"run_forever": True}, timeout=0)
            subflow_runs.append(subflow_run)

        raise Exception("Simulate flow failure")

        ### Some code that waits for the subflows in subflow_runs to finish would come here

if __name__ == "__main__":
    main.serve(name="test-flow")

image

Additional context

potentially related to https://github.com/PrefectHQ/prefect/issues/10947

desertaxle commented 6 months ago

Thanks for the enhancement request @j-tr! One question about your setup: do you wait for these deployment runs to complete at any point in your parent flow?

j-tr commented 6 months ago

hello @desertaxle, we are usually waiting for the deployment runs to reach a final state. i think there are at least two usecases: 1) the developer wants to start sub deployments asynchronously and returns from the parent flow before waiting for the subflows to finish. In this case, the current behavior is expected. However, group cancellation of all subflows could still be useful. 2) the developer is actually looking for sub flow behavior (flow run as python function, not via run_deployment) but the subflow requires a specific infrastructure that is only available via a different work pool. Therefore, it needs to run as a deployment. In order to get as close as possible to the sub-flow behavior, we use a polling mechanism like in https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/deployments.py#L230-L236 at the end of the parent flow to join all subflows and set the return value of the parent flow according to the states of the sub flows. In this case, it makes no sense to continue the subflows when the parent flow fails or crashes.

See https://github.com/PrefectHQ/prefect/issues/6689 for a related feature request.

mitchell-lawson commented 5 months ago

I am experiencing a very similar issue and gotten no definitive response from the community despite making numerous posts and asking multiple people in the community slack. I was told we have to manually manage the states with a cancellation hook (that did not work).

Real life example, I have 1000 jobs that all require nodes with GPUs, there's a concurrency limit of 100, I need to manually cancel 90% of them because the cancellation doesn't propagate or cascade to the subflows.

Every other orchestrator supports this basic functionality (Airflow, Luigi, Kubeflow, ClearML, Dragster, SageMaker Pipelines).

Do I really need to implement a system that manages the parent flow run id, and have every single subflow I create constantly query the status of the parent mid-execution, then raise a cancellation error when the parent reaches a terminal state?

Edit: I should add that this is observed with different server/client versions as well.

mitchell-lawson commented 4 months ago

This is the solution I implemented as a fix a while ago and suggested to the slack community as a cancellation hook.

async def cancel_subflows(flow, flow_run, state):
    async with get_client() as client:
        runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                parent_flow_run_id=dict(any_=[flow_run.id]),
                state=FlowRunFilterState(
                    name=dict(
                        any_=[
                            "Running",
                            "Scheduled",
                            "Paused",
                            "AwaitingRetry",
                            "Retrying",
                            "Late",
                            "Pending",
                            "Suspended",
                        ]
                    )
                ),
            )
        )
        for run in runs:
            await client.set_flow_run_state(
                flow_run_id=run.id, state=Cancelled(), force=False
            )
EmilRex commented 2 months ago

Just wanted to chime in with a simple reproduction. Run the script below, kick off a run, and then cancel that run. You'll see that the parent flow goes to a Cancelled state, and so does the child flow, but as evidenced by the logs, the child flow continues executing. This is particularly problematic when the child flow is on separate infrastructure and prevents that infrastructure from terminating.

import time

from prefect import flow, task, get_run_logger, serve
from prefect.deployments import run_deployment

@flow
def child(name: str):
    get_run_logger().info(f"Hello, {name}!")
    for _ in range(60):
        get_run_logger().info(f"I'm still here...")
        time.sleep(10)

@flow()
def parent(name: str = "Marvin"):
    run_deployment("child/default", parameters={"name": name})

if __name__ == "__main__":
    serve(
        child.to_deployment(name="default"),
        parent.to_deployment(name="default"),
    )

Version:

Version:             2.20.3
API version:         0.8.4
Python version:      3.12.5
Git commit:          b8c27aa0
Built:               Thu, Aug 22, 2024 3:13 PM
OS/Arch:             darwin/arm64
Profile:             sandbox
Server type:         cloud