flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.17k stars 550 forks source link

SubWorkflow Error handling #5513

Open kdubovikov opened 3 days ago

kdubovikov commented 3 days ago

Motivation: Why do you think this is important?

We are creating an orchestration workflow that governs a dynamic number of model sub-workflows that are being developed by different team members. Those workflows can fail unexpectedly and we need to handle those failures gracefully in the orchestration workflow so that it still can complete successfully, as some of the sub-workflows are ok to fail, while others can be an irrecoverable failure.

Goal: What should the final outcome look like, ideally?

Something like try-catch block for the workflow-level error handling, or an on-failure decorator that can accept rich information about the workflow-level error trace

Describe alternatives you've considered

We have considered using explicit error handling in our code and logging errors in the DB, but it seems incorrect when using workflow engine, as it's the workflow's responsibility to handle and process sub-workflow or task-level exceptions that leaks into the application logic if we will implement it this way.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

runllm[bot] commented 3 days ago

To handle sub-workflow errors gracefully in Flyte, you can use the failure_policy attribute with the @workflow decorator. This allows the parent workflow to continue executing even if some sub-workflows fail. Additionally, Flyte supports defining error handlers at the workflow level, which can be used to perform cleanup or logging tasks upon failure.

Here is an example of how you can define a parent workflow with error handling for sub-workflows:

from flytekit import task, workflow, WorkflowFailurePolicy

@task
def my_error_handler(error: Exception):
    # Handle the error, e.g., log it or clean up resources
    print(f"Error encountered: {error}")

@workflow(on_failure=my_error_handler)
def my_sub_wf():
    # Sub-workflow logic
    ...

@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def my_parent_wf() -> str:
    n1 = my_sub_wf()
    n2 = my_sub_wf()
    n3 = my_sub_wf()

    return n3.out1

In this example, my_parent_wf will continue running even if any of the sub-workflows (my_sub_wf) fail. The overall status of the execution will be marked as failure, but it will allow as many nodes as possible to execute. The my_error_handler task will be invoked whenever a sub-workflow fails to handle the error appropriately.

Would you like more details on this or other potential solutions?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!

kdubovikov commented 3 days ago
FAIL_AFTER_EXECUTABLE_NODES_COMPLETE

Here, the parent workflow will fail if any of the sub-workflows fails. What we need is to be able to decide on what happens, similar to what you can do with try/catch in Python

nihar-pixis commented 2 days ago

One of our use-case : Screenshot 2024-05-17 at 4 42 20 PM Here is one example that we've tried where failure is being simulated for subworkflow_a, so it failed, now the task_g is dependant on subworkflow_a, in some cases user still might want to execute task_g without subworkflows_a's output and b, couldn't find a way handle the failure of subworkflow_a from main_workflow. Attached the screenshot and workflow code for reference failure handling from the main/parent workflow can provide flexibility and more control as one might not always have control over subworkflow's from different teams/modules while orchestrating large and complex workflows.

Its always flexible to have error handlings on orchestration(node) level (be it subworkflow/task) as it provides MLEs/Data Engineers more control to decide the behaviours on data science/ other team module failures while dealing with complex orchestrations

eapolinario commented 1 day ago

Flyte has support for Failure nodes, which lets workflows define special nodes to be executed in case the workflow fails. Unfortunately this doesn't cover your use case as failure nodes don't resume the execution after the failure, they only give you an opportunity to run a task.