PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.85k stars 1.55k forks source link

Prevent running late runs after some threshold #9054

Open joelluijmes opened 1 year ago

joelluijmes commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

It would be great if we can configure flows / prefect not to start executing late flow runs (after some threshold). For some reason, I have a flow run which didn't start initially (which I didn't notice). After restarting the agent, this flow run is still picked up, and now it runs a flow which is >25 days late.

image

Describe the proposed behavior

Configure maximum threshold for late runs. After which the flow should transition into a Failed / Crashed state instead of running the flow.

Example Use

New parameter on the Flow would make sense @flow(late_run_theshold_seconds=3600), similar to timeout_seconds.

Additional context

Happy to contribute, but would be best to get some directions from the engineering team to ensure it fits in the design.

WillRaphaelson commented 1 year ago

Hey @joelluijmes, thanks for filing this issue. Are you on Cloud? This is a very common use case for automations - setting up a "zombie killer" automation to cancel flow runs that have been in a late state for some amount of time.

joelluijmes commented 1 year ago

Thank you for responding! No, I am not using Cloud. Ideally, this feature would be integrated into the core functionality 😬 However, building the "zombie killer" flow myself seems like good workaround if this won't be implemented.

zanieb commented 1 year ago

This could also be trivially implemented as a setting for agents — it can just place the run in a new state instead of running it.

WillRaphaelson commented 1 year ago

Yeah I like that approach for a global setting on the agent. Would cancelled be the right state here?

zanieb commented 1 year ago

@WillRaphaelson Yeah I think CANCELLED sounds right to me!

joelluijmes commented 1 year ago

I agree that configuring the agent is a good idea. I'm willing to work on it sometime this week. Should I claim the ticket or simply submit a pull request when I have something ready? I just want to ensure that the work is not duplicate.

zanieb commented 1 year ago

@joelluijmes it's good to claim it to avoid duplicated work.

I'd add the setting PREFECT_LATE_RUNS_CANCEL_AFTER_SECONDS

joelluijmes commented 1 year ago

@madkinsz created a POC here: https://github.com/PrefectHQ/prefect/compare/main...joelluijmes:prefect:feat/prevent-rate-runs. If you think this is the right direction, I can create proper unit tests / documentation. I'm aware that right now it hardcodes to 5 minutes, but I guess we should keep the default value optional as not to break existing behavior.

--

While digging in, I noted there is a late_run monitoring service, which marks the flow runs as late. I was wondering if it wouldn't make more sense to mark the flows here as cancelled (or another background service), instead checking it at the agent. An advantage would be that the flow run gets cancelled way earlier, even in case when the agent is not running for some reason.

zanieb commented 1 year ago

Thanks for putting together a POC!

This is a little tricky. We can add server-side settings for this that could then be enforced by the late runs service and could be enforced by the orchestration rules when the agent proposes a pending state. This would be the "proper" way to do it as the orchestrator would be making the decision instead of the agent. However, this means that you'd need access to the server to configure the settings which for our Cloud users is not an option. We'd need to add a user-facing setting for this in Cloud which would either be for the workspace or some lessor object like the work pool. So perhaps this just belongs as a field on the work pool? cc @desertaxle

I'm second guessing my original suggestion of a simple setting on the agent, but it's still probably the easiest path forward. Perhaps we should still consider doing it just as a flag i.e. --cancel-late 60 and avoid the the setting for now?

joelluijmes commented 1 year ago

Ah I see. And how about having it as a parameter on the flow itself? Similar to the timeout?

zanieb commented 1 year ago

@joelluijmes that's a possibility but it's more of a deployment-level feature — it doesn't make any sense for ad-hoc flow calls.

joelluijmes commented 1 year ago

Ah, I'm comfortable with either option. In my view, the flow timeout feature appears to be quite comparable to the proposed. Furthermore, it provides a touch more versatility. I can envision scenarios in which it is essential to avoid running late flows, but for other flows, it may not necessarily be a problem. For example, if they are scheduled to operate with a specific set of parameters.

Anyways, let me know what you think. I would like to see this feature implemented 😇

klayhb commented 1 year ago

Hey @joelluijmes, thanks for filing this issue. Are you on Cloud? This is a very common use case for automations - setting up a "zombie killer" automation to cancel flow runs that have been in a late state for some amount of time.

is there some example of this maybe posted somewhere?

also - any status on this?

WillRaphaelson commented 1 year ago

This remains on the backlog, I can up the internal priority.

joelluijmes commented 1 year ago

Yes would be great if this was implemented! I can still finish my POC if a decision has been made on how to configure this?

Anyways, @klayhb right now I just wrote my own watchdog flow, which runs every 30 minutes or so

import asyncio

from prefect import State, flow, get_client, task, get_run_logger
from prefect.server.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterStartTime,
    FlowRunFilterExpectedStartTime,
)
from prefect.server.schemas.states import StateType
from datetime import datetime, timedelta
from uuid import UUID

@task
async def find_long_running_flows() -> list[UUID]:
    THRESHOLD_HOURS = 4

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.RUNNING]),
                ),
                start_time=FlowRunFilterStartTime(
                    before_=datetime.utcnow() - timedelta(hours=THRESHOLD_HOURS),
                ),
            )
        )

    get_run_logger().info(
        f"Found {len(flow_runs)} long running flows (> {THRESHOLD_HOURS} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]

@task
async def find_stale_flows() -> list[UUID]:
    THRESHOLD_HOURS = 12

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.SCHEDULED]),
                ),
                expected_start_time=FlowRunFilterExpectedStartTime(
                    before_=datetime.utcnow() - timedelta(hours=THRESHOLD_HOURS),
                ),
            )
        )

    get_run_logger().info(
        f"Found {len(flow_runs)} stale flows (> {THRESHOLD_HOURS} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]

@task
async def cancel_flow_runs(flow_run_id):
    async with get_client() as client:
        state = State(type=StateType.CANCELLED, message="Cancelled by watchdog")
        await client.set_flow_run_state(flow_run_id, state, force=True)

@flow(name="Watchdog")
async def main():
    stale_flows = await find_stale_flows()
    await cancel_flow_runs.map(stale_flows)

    long_running_flows = await find_long_running_flows()
    await cancel_flow_runs.map(long_running_flows)

if __name__ == "__main__":
    asyncio.run(main())
zhen0 commented 11 months ago

Hi @klayhb! Thanks for your question. Here's an example/demo of using automations to handle runs that get stuck in running in Cloud. If you're in OSS the watchdog flow example should be helpful.

zhen0 commented 11 months ago

We'll keep this on the backlog but I would recommend using automations if you can!

lucasdepetrisd commented 8 months ago

I've implemented some enhancements to the Watchdog flow proposed by @joelluijmes. I included parameterized thresholds for both stale and long-running flows in case it's run manually from the UI. Additionally, I've introduced a verification mechanism to prevent the cancellation of the current flow.

I scheduled the flow to run every 30 minutes. I added with that schedule a verification at the beginning to prevent multiple executions in case the Watchdog runs stack up.

Also, I hope the idea of using the --cancel-late 60 flag is developed. That'd be a great help for the OSS users.

import asyncio
from datetime import datetime, timedelta, timezone
from uuid import UUID

from prefect import State, runtime, flow, get_client, task, get_run_logger
from prefect.server.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterStartTime,
    FlowRunFilterExpectedStartTime,
)
from prefect.server.schemas.states import StateType

@task
async def find_long_running_flows(threshold_hours) -> list[UUID]:

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.RUNNING]),
                ),
                start_time=FlowRunFilterStartTime(
                    before_=datetime.now(timezone.utc) -
                    timedelta(hours=threshold_hours),
                ),
            )
        )

    logger = get_run_logger()

    for flow_run in flow_runs:
        if runtime.flow_run.id == str(flow_run.id):
            flow_runs.remove(flow_run)
            logger.info(
                "The ID is that of the current Watchdog. It will not be canceled. ID: %s", str(flow_run.flow_id))

    logger.info(
        f"Found {len(flow_runs)} long-running flows (> {threshold_hours} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]

@task
async def find_stale_flows(threshold_hours) -> list[UUID]:

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.SCHEDULED]),
                ),
                expected_start_time=FlowRunFilterExpectedStartTime(
                    before_=datetime.now(timezone.utc) -
                    timedelta(hours=threshold_hours),
                ),
            )
        )

    logger = get_run_logger()

    for flow_run in flow_runs:
        if runtime.flow_run.id == str(flow_run.id):
            flow_runs.remove(flow_run)
            logger.info(
                "The ID is that of the current Watchdog. It will not be canceled. ID: %s", str(flow_run.flow_id))

    logger.info(
        f"Found {len(flow_runs)} flows with high delay (> {threshold_hours} hours)\n "
        + "\n ".join([f"{flow_run.name} ({flow_run.id})" for flow_run in flow_runs])
    )

    return [flow_run.id for flow_run in flow_runs]

@task
async def cancel_flow_runs(flow_run_id):
    logger = get_run_logger()
    async with get_client() as client:
        logger.info("Canceling flow with ID: %s", flow_run_id)
        state = State(type=StateType.CANCELLED,
                      message="Cancelled by watchdog")
        await client.set_flow_run_state(flow_run_id, state, force=True)

@flow(name="Watchdog")
async def watchdog(stale_threshold_hours: float = 12, long_running_threshold_hours: float = 4):
    # Get the time difference between when it was scheduled and started
    flow_timezone = runtime.flow_run.scheduled_start_time.tzinfo
    time_difference = datetime.now(
        flow_timezone) - runtime.flow_run.scheduled_start_time

    # If it started more than 30 minutes after it was scheduled, it should not execute
    # because another watchdog run will take care of it
    if time_difference < timedelta(minutes=30):
        stale_flows = await find_stale_flows(stale_threshold_hours)
        await cancel_flow_runs.map(stale_flows)

        long_running_flows = await find_long_running_flows(long_running_threshold_hours)
        await cancel_flow_runs.map(long_running_flows)
    else:
        await cancel_flow_runs(runtime.flow_run.id)

if __name__ == "__main__":
    asyncio.run(watchdog())
Yesurajup commented 2 weeks ago

what is the maximum run time of a flow run in prefect, can run flow for 80 days or more automation i prefect?