PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.31k stars 1.59k forks source link

retry_condition_fn doesn't work on Failed state #15826

Open ihor-ramskyi-globallogic opened 2 days ago

ihor-ramskyi-globallogic commented 2 days ago

Bug summary

From def task docstring:

retry_condition_fn: An optional callable run when a task run returns a Failed state. Should
    return `True` if the task should continue to its retry policy (e.g. `retries=3`), and `False` if the task
    should end as failed. Defaults to `None`, indicating the task should always continue
    to its retry policy.

When I return Failed state instead of raising error, it doesn't trigger. Minimal reproducible example:

from prefect import flow, task, get_run_logger
from prefect.states import Failed

def cond(task, task_run, state) -> bool:
    try:
        state.result()
    except:
        return "please_retry" in state.message

@task(retries=1, retry_condition_fn=cond)
def task_a():
    logger = get_run_logger()
    logger.info("Inside task_a")
    return Failed(message="please_retry")

@task(retries=1, retry_condition_fn=cond)
def task_b():
    logger = get_run_logger()
    logger.info("Inside task_b")
    raise Exception("please_retry")

@flow()
def generic_flow():
    task_a()
    task_b()

if __name__ == "__main__":
    generic_flow()

Traceback:

Logs and traceback: ``` 16:34:49.831 | INFO | prefect.engine - Created flow run 'warm-moth' for flow 'generic-flow' 16:34:49.835 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/113d3b35-c54f-4d83-9b7e-a91a6818380b 16:34:49.981 | INFO | Task run 'task_a-4ee' - Inside task_a 16:34:49.990 | ERROR | Task run 'task_a-4ee' - Finished in state Failed('please_retry') 16:34:50.009 | INFO | Task run 'task_b-53a' - Inside task_b 16:34:50.010 | INFO | Task run 'task_b-53a' - Task run failed with exception: Exception('please_retry') - Retry 1/1 will start immediately 16:34:50.016 | INFO | Task run 'task_b-53a' - Inside task_b 16:34:50.017 | ERROR | Task run 'task_b-53a' - Task run failed with exception: Exception('please_retry') - Retries are exhausted Traceback (most recent call last): File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 768, in run_context yield self File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1318, in run_task_sync engine.call_task_fn(txn) File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 791, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters return fn(*args, **kwargs) File "My_Path\test_retries.py", line 24, in task_b raise Exception("please_retry") Exception: please_retry 16:34:50.028 | ERROR | Task run 'task_b-53a' - Finished in state Failed('Task run encountered an exception Exception: please_retry') 16:34:50.029 | ERROR | Flow run 'warm-moth' - Encountered exception during execution: Exception('please_retry') Traceback (most recent call last): File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 655, in run_context yield self File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 699, in run_flow_sync engine.call_flow_fn() File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 678, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters return fn(*args, **kwargs) File "My_Path\test_retries.py", line 30, in generic_flow task_b() File "My_Path\venv\lib\site-packages\prefect\tasks.py", line 1002, in __call__ return run_task( File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1507, in run_task return run_task_sync(**kwargs) File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1320, in run_task_sync return engine.state if return_type == "state" else engine.result() File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 475, in result raise self._raised File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 768, in run_context yield self File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1318, in run_task_sync engine.call_task_fn(txn) File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 791, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters return fn(*args, **kwargs) File "My_Path\test_retries.py", line 24, in task_b raise Exception("please_retry") Exception: please_retry 16:34:50.089 | ERROR | Flow run 'warm-moth' - Finished in state Failed('Flow run encountered an exception: Exception: please_retry') Traceback (most recent call last): File "My_Path\test_retries.py", line 34, in generic_flow() File "My_Path\venv\lib\site-packages\prefect\flows.py", line 1355, in __call__ return run_flow( File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 821, in run_flow return run_flow_sync(**kwargs) File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 701, in run_flow_sync return engine.state if return_type == "state" else engine.result() File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 255, in result raise self._raised File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 655, in run_context yield self File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 699, in run_flow_sync engine.call_flow_fn() File "My_Path\venv\lib\site-packages\prefect\flow_engine.py", line 678, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters return fn(*args, **kwargs) File "My_Path\test_retries.py", line 30, in generic_flow task_b() File "My_Path\venv\lib\site-packages\prefect\tasks.py", line 1002, in __call__ return run_task( File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1507, in run_task return run_task_sync(**kwargs) File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1320, in run_task_sync return engine.state if return_type == "state" else engine.result() File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 475, in result raise self._raised File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 768, in run_context yield self File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 1318, in run_task_sync engine.call_task_fn(txn) File "My_Path\venv\lib\site-packages\prefect\task_engine.py", line 791, in call_task_fn result = call_with_parameters(self.task.fn, parameters) File "My_Path\venv\lib\site-packages\prefect\utilities\callables.py", line 206, in call_with_parameters return fn(*args, **kwargs) File "My_Path\test_retries.py", line 24, in task_b raise Exception("please_retry") Exception: please_retry ```

Version info

Version:             3.0.11
API version:         0.8.4
Python version:      3.10.11
Git commit:          a17ccfcf
Built:               Thu, Oct 24, 2024 5:36 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         server
Pydantic version:    2.7.1

Additional context

No response

zzstoatzz commented 1 day ago

thank you for the issue!

The issue makes a lot of sense given the wording of the docstring

An optional callable run when a task run returns a Failed state

but I think there's an argument (as suggested by @desertaxle) to be made that returning Failed as a literal value from tasks would be a shortcut for forcing a task to fail - returning State objects as literal values is a bit of an escape hatch as is.

If you don't mind @ihor-ramskyi-globallogic - can you explain a bit about the motivation for retrying tasks where you explicitly return a Failed state instead of letting an exception raise?

ihor-ramskyi-globallogic commented 1 hour ago

Thank you for the reply, @zzstoatzz ! We want visualizer to mark task red when it worked without failure, but data returned is wrong. It made a lot of sense at the time it was first implemented to return Failed state with this incorrect data, possibly continuing with the flow further, using task_name.submit().result(raise_on_failure=False) and mark task failed for further investigation, not disrupting the work of flow itself. So we agreed on convention to return Failed state from try/except instead of letting the error raise.