flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.63k stars 622 forks source link

[Core Feature] Failure-Node support #1506

Open kumare3 opened 3 years ago

kumare3 commented 3 years ago

Motivation: Why do you think this is important? Flyte backend supports a Failure-node for every workflow / sub-workflow. This is not currently exposed in flytekit (python or Java)

Goal: What should the final outcome look like, ideally? Users should be able to define failure nodes for their workflows. An example for the python SDK is as follows

@task
def my_error_handler(...):
   ...

@workflow(on_error=my_error_handler)
def my_wf():
   ...

If my_wf() fails at any point during execution, it'll call my_error_handler() task and will pass some context (error info... etc.) to allow it to handle the error. The expectation is that my_error_handler() would do things like clean up resources, log/send customized notifications... etc. The thing it will NOT let you do is recover from failure... The execution of this workflow will still fail, be marked as failure and upstream callers will still be notified of its failure.

An example of sub-workflows:

@workflow(on_error=my_error_handler)
def my_sub_wf():
    ...

@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def my_parent_wf() -> str:
   n1 = my_sub_wf()
   n2 = my_sub_wf()
   n3 = my_sub_wf()

   return n3.out1

In this case, my_parent_wf will continue running even if any of the nodes fails. The overall status of the execution will again be marked as failure but it'll let as many nodes as possible to execute... Whenever my_sub_wf fails, it'll invoke an instance of my_error_handler task to cleanup resources... etc.

Describe alternatives you've considered NA

[Optional] Propose: Link/Inline OR Additional context More discussion in https://github.com/flyteorg/flyte/issues/1012

Related flytekit java issue - #1012

kumare3 commented 3 years ago

cc @EngHabu / @cosmicBboy / @kanterov

fvde commented 1 year ago

As old as this issue may be, we would absolutely LOVE this and it has been a stable feature on other orchestration engines (such as Kubeflow) for many years.

dylanwilder commented 1 year ago

@kumare3 is this a flytekit only change or would it require changes to propellor to propagate error state

EngHabu commented 1 year ago

@dylanwilder the changes in the backend are mostly already done.. it's possible they have regressed because of the lack of end to end testing for it (because it's not implemented in flytekit).. would you be able to help with the flytekit side if things?

dylanwilder commented 1 year ago

Potentially we could pitch in since we'd like to see this, do you have anything outlining what's required?

EngHabu commented 1 year ago

@dylanwilder I think this is really close. Maybe attempt to use it in an example and start the debugging journey from there? Happy to be pulled in once you get flytekit to produce the spec in case you deem it a problem with the backend...

dylanwilder commented 1 year ago

Thanks will take a look and see!

kumare3 commented 1 year ago

@eapolinario is probably also looking into this

gitgraghu commented 1 year ago

wondering if there is a way to support inputs and outputs that are different from the workflow interface in the failure handler..below is an example use case we were trying to implement:

EngHabu commented 11 months ago

Was just brainstorming with @pingsutw now on this... here are my thoughts on UX:

@workflow
def my_wf(a: int) -> str:
   b = my_task(a=a)
   flytekit.current_context().on_failure = clean_up(a=a, b=b)
   return b

@task
def clean_up(err: Error, a: Optional[int], b: Optional[str]) -> str:
   ...
  1. Set failure node within the workflow code instead of the decorator to allow passing inputs (don't know if return b or clean_up(a=a, b=b) as an alternative syntax is too hacky?
  2. clean_up must take Error as the first parameter and can take any number of extra inputs as long as they are all Optional. Propeller will fill them in if they are available or None otherwise... it's the implementor's job to handle those cases correctly within clean_up

wdyt @kumare3 @eapolinario @gitgraghu

pingsutw commented 11 months ago

Need some discussion about

  1. if a subworkflow and the parent workflow both have failure handlers… do both run?
  2. if i have parent_wf and sub_wf… both have failure handlers. i can see an argument to say - when i run parent_wf, if the error was in the sub_wf, just run the sub_wf failure handler, if the error happened in the parent_wf then run the parent_wf. or both run.

PRs for failure node. (still WIP)

also cc @cosmicBboy @wild-endeavor

EngHabu commented 11 months ago
  1. I believe it should
  2. The way I think about it is that first the subworkflow fails, so it kicks off the failure node handler. After it finishes, the subworkflow itself will report that it failed overall, then the parent node will handle failure as well..
wild-endeavor commented 5 months ago

is this okay to close?