PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.01k stars 1.57k forks source link

Worker picks up `AwaitingRetry` flow runs waiting to execute decorator-defined retries if `retry_delay_seconds` exceeds worker's polling window #15458

Open kevingrismore opened 2 weeks ago

kevingrismore commented 2 weeks ago

Bug summary

This is harder to reproduce on cloud, but happens consistently on a local server.

Deploy this flow to a work pool (what the task functions do doesn't matter, this is just to match the included screenshot):

@flow(retries=1, retry_delay_seconds=30)
def my_pipeline():
    data = extract()
    transformed_data = transform(data)
    load(transformed_data)
    raise Exception("Pipeline failed")

After the first failure, 30 seconds will pass and the flow will be retried in its first process, but the worker will also pick up the flow run again and start a new process/container, running only what it believes to be the remaining retry.

The final outcome is a run count of 3, even though it should run two times at most.

image

Version info (prefect version output)

Version:             3.0.3
API version:         0.8.4
Python version:      3.11.5
Git commit:          d9c51bc3
Built:               Fri, Sep 20, 2024 8:52 AM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         server
Pydantic version:    2.8.2

Additional context

I was not able to reproduce this with a served flow.

desertaxle commented 1 week ago

@kevingrismore what's the total runtime of this flow? It looks like it's fairly short and that the two runs at the end run in succession. If you extend the runtime of the flow, does this issue still occur? My hunch is that this won't crop up with longer flows because the PENDING transition for the flow run the worker picks up will be rejected.

kevingrismore commented 1 week ago

@desertaxle Yeah it's extremely short, like 1 second or less. Let me try it with a longer runtime.

kevingrismore commented 1 week ago

@desertaxle I added a 20 second sleep to the flow just before raising the exception (note how long it's in Running), and still see it happening.

image
desertaxle commented 1 week ago

Welp, there goes that theory.

The root cause is that AwaitingRetry is a SCHEDULED state type that gets picked up by the worker. The worker transitions the flow run to PENDING and then RUNNING. The process then starts the retry and transitions the flow run to RUNNING. We don't stop RUNNING to RUNNING transitions, so we get duplicate runs.

I replicated this with 2.20.8 too, so this isn't a new issue. We'll need to find a way to tell workers they shouldn't pick up these flow runs. I explored using run_count to prevent this in #15482, but that would cause issues with UI-triggered retries.