PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.48k stars 1.64k forks source link

Failed states not working as expected in Prefect 3.0 #15300

Closed bjorhn closed 2 months ago

bjorhn commented 2 months ago

Bug summary

I've just upgraded to Prefect 3.0.1 and I've bumped into a problem the states.

I have a sync flow, which runs an async task using submit(). This task sometimes explicitly returns a Failed state, but it seems when it does so, Prefect believes it's been returned a Completed state (the task turns green in the graph and the log says "Finished in state Completed"). Is this a known problem with Prefect 3.0 or am I missing something?

Version info (prefect version output)

Version:             3.0.1
API version:         0.8.4
Python version:      3.11.9
Git commit:          c6b2ffe1
Built:               Fri, Sep 6, 2024 10:05 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud
Pydantic version:    2.8.2
Integrations:
  prefect-dbt:       0.6.0
  prefect-snowflake: 0.28.0
  prefect-shell:     0.3.0
  prefect-azure:     0.4.0

Additional context

No response

zzstoatzz commented 2 months ago

hi @bjorhn - thank you for the issue!

are you able to share a minimal reproduction of this? I tried this

In [1]: from prefect import flow, task

In [2]: from prefect.states import Failed

In [3]: @task
   ...: async def t():
   ...:     return Failed()
   ...:

In [4]: @flow
   ...: def f():
   ...:     future = t.submit()
   ...:     return future.result()

and am unable to reproduce so far

question, are you returning the Failed state (i.e. the task's result) from your flow or not? because if not, this would be expected behavior. This is a behavior change from prefect 2, the differences highlighted here.

bjorhn commented 2 months ago

I'm actually trying to reproduce now and so far I can't replicate the behavior in my minimal example. I will try adding code from my production flow to the reproduction MVE until it also breaks..

I'm both returning the result at the end of the flow, and using a reference to the future in a wait_for downstream from the task submission. The downstream tasks runs regardless of whether the first task fails or succeeds, which makes sense since it's always returning the Completed state.

bjorhn commented 2 months ago

I found the problem!

I was importing (and returning) Failed from prefect.server.schemas.states, which used to work in Prefect 2. I finally saw that your example imports Failed from prefect.states, which works. I didn't see this mentioned in the migration documentation, perhaps this change was made a long time ago but Failed from prefect.server.schemas.states kept working as a deprecated feature?

Thanks for the help!