apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.15k stars 14.32k forks source link

Fail fast a Dag if any of the tasks failed #26854

Open timothy-khom opened 2 years ago

timothy-khom commented 2 years ago

Description

We would like to be able to implement fail fast strategy of the Dag. It means if any of the task fails it is required to stop all the rest tasks running in parallel and mark the entire Dag as FAILED. For example, we have 3 tasks in the first Task Group, which are executed in parallel. The second Task Group or a task needs not only to fail (which can be achieved by setting ALL_SUCCESS trigger rule) but also stop the running tasks. Currently we implement it by adding one more nested Task Group into the first group with two tasks:

  1. Task 1 with ONE_FAILED trigger rule is PythonOperator, which calls the function, which looks for the upstream tasks of this task in the internal TaskInstance Airflow table and mark them as FAILED. E.g.
    @provide_session
    def mark_tasks_as_fail(task, data_interval_start, dag, session=None, **_):
    upstream_task_instances = session.query(TaskInstance).filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == data_interval_start,
        TaskInstance.task_id.in_(task.upstream_task_ids)).all()
    for ti in upstream_task_instances:
        ti.state = TaskInstanceState.FAILED
  2. Task 2 with ALL_DONE trigger rule is also PythonOperator with the function, which just raises an exception to cause the next task or group with ALL_SUCCESS trigger rule to fail. Eventually the entire Dag fails.

We do not like such approach, since it relies on the Airflow internals.

Please advise if there is a way to implement what is needed by using some high-level API.

Use case/motivation

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

o-nikolas commented 2 years ago

This is an interesting feature request! There is also some existing precedent for this feature as well, but only available with the DebugExecutor:

Additionally DebugExecutor can be used in a fail-fast mode that will make all other running or scheduled tasks fail immediately. To enable this option set AIRFLOWDEBUGFAIL_FAST=True or adjust fail_fast option in your airflow.cfg. source

Perhaps this is something the base executor should support 🤔

timothy-khom commented 2 years ago

@o-nikolas thank you for the response. Forgot to mention that we use KubernetesExecutor, so the suggested feature seems not to be applicable for us. So could you please advise if this feature might be included into some of the future Airflow's releases?

o-nikolas commented 2 years ago

Forgot to mention that we use KubernetesExecutor, so the suggested feature seems not to be applicable for us.

Yupp, I'm aware, I was just adding context to the issue that there is an existing precedent for this feature already :)

So could you please advise if this feature might be included into some of the future Airflow's releases?

Unfortunately, I can't say for sure whether this will be supported, someone will have to produce a PR and see if it gets accepted by the community. Feel free to put forward a PR, it's a great way to get involved in Airflow!

potiuk commented 2 years ago

Yeah. I think this is a good idea to implement. Probably a bit more than "good first issue" but should be quite doable.

eladkal commented 2 years ago

it means if any of the task fails it is required to stop all the rest tasks running in parallel and mark the entire Dag as FAILED.

I think the actual request here is to stop all running tasks (Which has been asked before https://github.com/apache/airflow/issues/10704 ). If we externally killing the running tasks then they will be in failed status, as a result their downstream tasks will be automatically marked as upstream_failed (assuming all_success trigger rule). Then you will get the behavior of immediately failing a DAG without need to implement failing a DAG functionality.

timothy-khom commented 2 years ago

@eladkal not sure I fully understood you suggestion. Do you mean we should add functionality in our application to kill the upstream tasks?

eladkal commented 2 years ago

I just gave my point of view on the feature request to solve this pain (should someone decide to pick this issue and implement it)

potiuk commented 2 years ago

Just to explain how it works and make sure expectations are set appropriately @timothy-khom

The thing is that in open-source project like this, features get implemented when .... someone implements it .. and this is soleley based one someone's interest in moving this forward. There is no "team" that would implement such feature merely because it is created. If you really want to implement a feature and there is no-one who immendietely picks an interest and start implementing it, the best way is to make a PR and implement it yourself - or find someone better knowledgable who would pick it and implement - possibly even paying such person to do the job.

What Elad wrote is an opinion on how it could be done. And if you feel like wanting to work on it - feel free following the suggestions. Then we can help to review and merge the PR as maintainers (and then when we see the implementation proposal we might have more suggestions/comments). Also there is a chance someone in the meantime will actually pick an interest and implement it, but this is something that might or might not happen sooner or later.

This feature is not "huge" I think and it's pretty straightforward - so (as I wrote above) it does not need a complete AIP (Airflow Improvement Proposal) - it can likely be handled in a single PR.

eladkal commented 2 years ago

It also have some similarities with https://github.com/apache/airflow/issues/15951

rkarish commented 2 years ago

I could take a look into this one over the next few weeks. Trying to think of ways to accomplish this... Is this going to require changes to BaseExecutor, TaskInstance, and BaseOperator? As a starting point I was thinking about adding a parameter to BaseOperator and TaskInstance and be able to access it somewhere in BaseExecutor to handle failing all tasks within a dag run. Does this seem like a reasonable approach? If anyone has any thoughts on how to accomplish this functionality that would be great.

Edit: Actually now that I think about it, maybe this configurability should be configurable at a dag level? Or is it better fit for operator so you could narrow it down to 1 tasks that will fail all tasks if fail?

potiuk commented 2 years ago

Dag level is good.

eladkal commented 1 year ago

Solved in https://github.com/apache/airflow/pull/29406

ashb commented 3 weeks ago

Re-opening this, it turns out that the supposed fix doesn't mark the DagRun as failed.