PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.78k stars 1.54k forks source link

Handle flow run restarts caused by infrastructure events #7116

Closed tekumara closed 1 year ago

tekumara commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

The underlying infrastructure may decide to restart the flow, eg: as part of normal operations Kubernetes may wish to reschedule pods running the flow job (eg: because of node scale down or failure).

When this happens the flow is stopped and then started again by Kubernetes. However when it starts for the second time it immediately fails with:

prefect.engine - Engine execution of flow run 'd53003f4-9f6a-4322-834a-406185693232' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.

The flow execution aborts here, but the flow is left in the RUNNING state, even though nothing is running.

Describe the proposed behavior

Prefect flows are resilient to infrastructure restarts, eg: the flow begins again from the last running task and continues instead of aborting. Flows are not left hanging in the RUNNING state.

Example Use

To reliably run flows in a cloud native environment (eg: Kubernetes).

Additional context

This also occurs in Prefect 1.x, eg:

Beginning Flow run for 'child'
Task 'hello_task': Starting task run...
Task 'hello_task': Finished task run for task with final state: 'Running'
Flow run RUNNING: terminal tasks are incomplete.

Here the task and flow try to run but are already in the Running state and so the flow aborts.

zanieb commented 1 year ago

Hm. This logic prevents duplicate execution runs. I'm not sure how we can differentiate an infrastructure restart from an attempt to run the flow while it is already running.

kevin868 commented 1 year ago

How can we timeout the flow run from the API server / Cloud side? Typically the agent handles the run/task timeout. But in the case of a crash, the agent no longer has control of that Flow Run (so those become stranded as RUNNING, with nobody actually processing it). That eats up concurrency slots of a queue, and can result in a stuck workflow, if there are N crashed jobs and the concurrency is N.

One approach is to have some concept like a [VisibilityTimeout, as on AWS SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html).. where if a Flow Run does not finish (or get a heartbeat within N).. the flow run is pushed back onto the queue, as Scheduled (and late). Then an agent can see it and pick it up again. Another option is to timeout that flow Run and set the state to Crashed.

We also face this issue, if the running infrastructure ever crashes.. so it would be nice to have some system that helps flow runs recover, and not end up stuck.

mike-lev-j commented 1 year ago

I have this issue with our production pipelines and many tasks trigger external services (DBT/Fivetran etc) and wait for these tasks to succeed. We have a configuration with spot instances running and this causes consistent errors and hanging flows. Major issue

mattijsdp commented 1 year ago

Not sure how to fix it but am also running into the same problem with spot instances (aws eks) on k8s

yaronlevi commented 1 year ago

We are also seeing this issue. Not doing something special, just waiting for a bunch of tasks to complete, and suddenly one of them is stuck in this state and fails the whole flow. Some more info: We are running the flow in Render.com background worker. We are using the local Dask runner. We are running the flows on after another in a forever while loop.

CleanShot 2023-02-16 at 12 26 07@2x

CleanShot 2023-02-16 at 12 26 26@2x

CleanShot 2023-02-16 at 12 26 51@2x

Would be happy to provide more info if needed to solve the issue. Thanks!

zanieb commented 1 year ago

I believe we are now allowing RUNNING -> RUNNING transitions — is this issue resolved?

4d11 commented 1 year ago

@madkinsz I'm currently on Prefect v2.10.6 and am still encountering a Running->Running exception and the flows being stuck as Running.

02:09:43.153 | INFO    | prefect.engine - Engine execution of flow run '8ca3a5c1-5ec7-4075-925f-09d11dfb26b1' aborted by orchestrator: This run cannot transition to the RUNNING state from the RUNNING state.
zanieb commented 1 year ago

We allow RUNNING -> RUNNING transitions for tasks (https://github.com/PrefectHQ/prefect/pull/8802) but not flows yet. As long as the agent uses SCHEDULED -> PENDING and we ban PENDING -> PENDING as a lock preventing duplicate submissions, RUNNING -> RUNNING should be safe to allow for better behavior on infrastructure restarts.

ibabeo commented 1 year ago

@madkinsz We experience the same as describe in Additional Context quite often. Is the updated also implemented for 1.x?