PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.77k stars 1.54k forks source link

Support retrying flows on crashed state #10211

Open rbanick opened 1 year ago

rbanick commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

I’ve set up an Automation to re-run flows that crash. This nicely catches random crashes but goes haywire when an error inherent to the flow causes crashes — the flow will re-run and crash ad infinitum.

At the suggestion of @WillRaphaelson on the Prefect Slack Community I attempted to use the retries parameter in a flow as an alternate mechanism for retrying that respects limits.

@flow(
    name="my-flow”,
    retries=3
)
def my_flow(
# stuff

I then deployed the flow and deliberately crashed it* to test the retry functionality.

Unfortunately, the retries parameter did not cause retries on the flow when it crashed. If I turned on the relevant automation to trigger retries, the retries parameter did not limit the automation — I observed 5 retries before I canceled it.

Environment Python 3.10.9 Prefect 2.20 Mac OS X 13.4.1

*My Prefect deployments are set up to install any packages specified with the EXTRA_PIP_PACKAGES environment variable prior to runtime. I induced an artificial crash of the fly by specifying {“env.EXTRA_PIP_PACKAGES” : “not a package”} in the relevant deployment’s Infrastructure Overrides.

Describe the proposed behavior

Ideally setting retries=n would cause a flow to retry not only on Failure, but on Crashed, a maximum of n times.

A bonus would be to have this retry functionality play nicely with Automation retries, just in case.

An alternate path would be to introduce a limits parameter on the Automation trigger. But handling this within a flow and not an automation is preferable, given that it keeps things closer to source + Prefect limits most users’ total Automations.

Example Use

The use of retries in the code below (copied from above) should work.

@flow(
    name="my-flow”,
    retries=3
)
def my_flow(
# stuff

Additional context

No response

zhen0 commented 1 year ago

Thanks for the issue! I'm flagging for product feedback as I think you present an interesting problem that we could think through a more elegant solution for than is currently offered.

WillRaphaelson commented 1 year ago

per slack discussion, if we mark something as crashed, we think its an infra issue and we cant guarantee that we can persist retry counts. Robert will try to use on_crashed state change hooks and automations.

WillRaphaelson commented 1 year ago

@rbanick in response to some other reports, I'm actually going to reopen and accept this enhancement proposal. We may not always nail the retry behavior if infra is truly unavailable, but a best effort attempt at retrying on crashed states is worth a shot and strictly better than not trying.

serinamarie commented 11 months ago

We had some internal discussion about this (not available externally): https://prefecthq.slack.com/archives/C05RE7Q2FHU/p1694461707890159

We could attempt to resubmit the job if it was due to an intermittent failure, or maybe introduce a limit to an automation as @rbanick proposed.

arnoin commented 10 months ago

This is somehow relevant for spot and preemptible nodes

meggers commented 8 months ago

Chiming in that this would be extremely useful for pod evictions for k8s flows. The Automations solution requires a new flow run via the "Run a deployment" action vs retrying the original crashed run, in addition to the infinite retry possibility.

soamicharan commented 4 months ago

@WillRaphaelson @zhen0 @rbanick If you can implement in prefect server where we have flow like

@flow(retries=5)
def test flow():
    pass

Now in those scenario where flow is execution on kubernetes job infrastructure and due to evictions or unknown reason, kubernetes job pod exited, so prefect worker reports flow is crashed and prefect server look that flow has N retries (let say 5) so just change the flow status back to schedule by using propose_state function then flow will start executing, this can be simple enchancement applied at prefect server to fix the retry behaviour at flow level.

jfloodnet commented 2 months ago

any progress on this issue?

ameyapathare commented 1 month ago

adding support for this - would want to add the ability to retry in cases of transient infrastructure failures, ideally trying again after 10 minutes to give the infrastructure time to self-heal

cicdw commented 1 month ago

Yup makes sense - this will need to be a server-side setting (because recovering from a crash requires a managing process to re-submit), which means we'll most likely expose retry settings on deployments to achieve this.

yaronlevi commented 1 month ago

@cicdw Just an idea:

In the case of ECS push pools causing a crash state with a 504 error, could Prefect identify this specific case and silently retry?

For example, when Prefect's Managed Infra becomes generally available and likely uses ECS push pools behind the scenes, how would it communicate a 504 crash to the end user? Would it simply say, "Hey, it just crashes like this from time to time"? 🙂

So, whatever mechanism Prefect will use behind the scenes to make the Managed Infra "just work," couldn’t this mechanism be exposed to someone using ECS push pools?

cicdw commented 1 month ago

If we experienced this issue in any of our backend services we would definitely fix it in a way that doesn't surface to users because the solution is fully within our control given that it's occurring within our infrastructure. In your case though, there's no evidence yet that it's a problem with the job that Prefect submits - it seems to be a problem with the network connection between your ECS infrastructure and DockerHub.

If it turns out there's some job configuration that we could put a better default on to avoid these 504s, then that sounds great and we could look at including that and exposing it. But right now I still think this is something to do either with DockerHub's API or network traffic in AWS. You might be able to fix it by copying the public images you're using into a private ECR image that your ECS job pulls from but this isn't something that's in our control.