apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.23k stars 14.08k forks source link

New dag run / task state: disposed #12199

Open billcrook opened 3 years ago

billcrook commented 3 years ago

Description

Introduce a new dag run / task state called "disposed". This new state would represent the acknowledgment of a failed run/task that should not be retried. Update the UI and CLI to provide a mechanism for the disposal action.

Use case / motivation

In a former gig we had a homegrown job management system. One nice feature for operations was the ability to "dispose" of a failed job. Disposal indicated that we recognized the failure, investigated it and decided the job should not be retried and the failure could be ignored going forward. This removed the failure from our daily operations report which we used to investigate failures. I find myself yearning for this feature lately. In airflow I have to either re-run the job or mark it as successful for it to leave my daily operations failure report. This could simply be implemented as a new dag/task state and action. We also had a "dispose reason" for tracking purposes - just a notes field for why the operator performed the action. Since we dealt with financial transaction feeds, we needed this. The auditability of a dispose state + notes field would be quite useful.

Related Issues

None that I see.

Tagging @ryw

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

ryw commented 3 years ago

As I mentioned in the Slack thread, I think this is a good step towards improved auditability for Airflow.

Maybe bundle this together with a few other audit improvements for a 2.X release.

ashb commented 3 years ago

Love the feature (and I've been thinking on and off about this for 2 years. Just never thinking enough to open an issue) -- not sure of the name is the only thing.

"Acknowledged Failure" -- or perhaps to make the task dep checking easier, the state could be left as failure, but somewhere else we record "yes, this failure has been investigated, it's 'okay'".

Anyway, :100: to the feature idea.

thcidale0808 commented 3 years ago

@ashb can I contribute with the implementation of this feature?

ryw commented 3 years ago

@thcidale0808 yes would be great :)

turbaszek commented 3 years ago

@ryw does implementing this feature means that this discussion reached consensus? https://lists.apache.org/thread.html/r14e23de7b71f61e3e81b11eb84da2612938982c46c9824e884d8ba50%40%3Cdev.airflow.apache.org%3E

ryw commented 3 years ago

I didn't remember that we had a dev thread on the topic.

A concrete implementation would help us resolve any lingering fuzziness around the feature.

turbaszek commented 3 years ago

I didn't remember that we had a dev thread on the topic.

No worry, I think it would be good to refresh this thread as the approach proposed there gives even more auditability.

A concrete implementation would help us resolve any lingering fuzziness around the feature.

I agree, but it's good if the person who works on it knows that the effort is proof of concept, not something that was agreed upon 😄

thcidale0808 commented 3 years ago

@turbaszek , based on that email thread, I guess this will be implemented by Ace Haidrey teams, so in this case I'll look for another issue to contribute. :)

turbaszek commented 3 years ago

@thcidale0808 I would suggest you to send a email in this thread so others know that you would be interested in helping make this feature real 😉

ldacey commented 3 years ago

This would be perfect for tasks where failure is acceptable or predicted. Ideally this would be another state so it would not look red on the main Airflow page (I have been sharing my screen before and people have asked why there are so many failures for certain schedules, and I have to explain that failure is acceptable etc.).

shettyrag commented 2 years ago

I was able to create state called "warning" with below changes.

  1. Created with execption called AirflowWarningException in exceptions.py
  2. Made some changes in utils/state.py(adding new state), models/taskinstance.py, ti_deps/deps/trigger_rule_dep.py.
  3. I raising the exception in my custom bash operator based on type of failure. image

Let me know your thoughts and any impacts from this approach.

bbovenzi commented 2 years ago

@Anurag-Shetty Nice work! Is warning the same as acknowledged? Also, the color seems to be too similar to upstream_failed.

shettyrag commented 2 years ago

if you meant that its error but not failure. pipeline should continue to execute next task acknowledging the failure. then Yes, its can be named as acknowledged more general use. We could keep different color. Wanted to understand if there any implications of adding new state?

billcrook commented 2 years ago

pipeline should continue to execute next task acknowledging the failure.

The original intent of this ticket is to acknowledge that a task has failed. I don't believe the pipeline should continue to execute the next task in the failed dag run.

tothandor commented 2 years ago

I have already made a quick'n'dirty patch for that, that introduces an "acknowledge" state, to be able to track the errors already handled, and the ones left to be done. If someone is familiar with monitoring systems (e.g. CheckMK, Nagios, Zabbix, Zenoss), then this won't be new for him/her. Each error should either be dismissed or fixed within a time. In some sense Airflow is also a monitoring system. The color I chose for acknowledged state is sienna, which is between green (good, ok), and red (bad, failure, error), but it's still looks rather dull red, according to my perception, to mark a healed problem, which still shows its presence. Later on this could help to identify a returning problems, which would requires more inspection.

--- airflow/settings.py.orig    2021-12-07 15:49:11.960772472 +0100
+++ airflow/settings.py 2021-12-07 15:30:21.792037118 +0100
@@ -94,6 +94,7 @@
     "skipped": "pink",
     "scheduled": "tan",
     "deferred": "mediumpurple",
+    "acknowledged": "sienna",
 }

--- airflow/api/common/experimental/mark_tasks.py.orig  2021-11-17 16:31:35.044668390 +0100
+++ airflow/api/common/experimental/mark_tasks.py   2021-12-07 17:05:20.931619345 +0100
@@ -71,6 +71,7 @@
     state: str = State.SUCCESS,
     commit: bool = False,
     session=None,
+    failed: bool = False,
 ):
     """
     Set the state of a task instance and if needed its relatives. Can set state
@@ -89,6 +90,7 @@
     :param state: State to which the tasks need to be set
     :param commit: Commit tasks to be altered to the database
     :param session: database session
+    :param failed: Affect only failed task instances
     :return: list of tasks that have been created and updated
     """
     if not tasks:
@@ -121,6 +123,8 @@
         if sub_dag_run_ids:
             qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
             tis_altered += qry_sub_dag.with_for_update().all()
+        if failed:
+            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
         for task_instance in tis_altered:
             task_instance.state = state
             if state in State.finished:
@@ -131,6 +135,9 @@
         if sub_dag_run_ids:
             qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
             tis_altered += qry_sub_dag.all()
+        if failed:
+            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
+
     return tis_altered

@@ -316,6 +323,35 @@
     )

+@provide_session
+def set_dag_run_state_to_acknowledged(dag, execution_date, commit=False, session=None):
+    """
+    Set the dag run for a specific execution date and its task instances
+    to acknowledged.
+
+    :param dag: the DAG of which to alter state
+    :param execution_date: the execution date from which to start looking
+    :param commit: commit DAG and tasks to be altered to the database
+    :param session: database session
+    :return: If commit is true, list of tasks that have been updated,
+             otherwise list of tasks that will be updated
+    :raises: ValueError if dag or execution_date is invalid
+    """
+    if not dag or not execution_date:
+        return []
+
+    # Mark the dag run to success.
+    if commit:
+        _set_dag_run_state(dag.dag_id, execution_date, State.ACKNOWLEDGED, session)
+
+    # Mark all failed task instances of the dag run to acknowledged.
+    for task in dag.tasks:
+        task.dag = dag
+    return set_state(
+        tasks=dag.tasks, execution_date=execution_date, state=State.ACKNOWLEDGED, commit=commit, session=session, failed=True
+    )
+
+
 @provide_session
 def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
     """
--- airflow/utils/state.py.orig 2021-11-17 16:31:35.240667990 +0100
+++ airflow/utils/state.py  2021-12-07 15:30:50.023955812 +0100
@@ -48,6 +48,7 @@
     SKIPPED = "skipped"  # Skipped by branching or some other mechanism
     SENSING = "sensing"  # Smart sensor offloaded to the sensor DAG
     DEFERRED = "deferred"  # Deferrable operator waiting on a trigger
+    ACKNOWLEDGED = "acknowledged" # task may be failed, but it's okay, it's state should not make noise

     def __str__(self) -> str:  # pylint: disable=invalid-str-returned
         return self.value
@@ -66,6 +67,7 @@
     RUNNING = "running"
     SUCCESS = "success"
     FAILED = "failed"
+    ACKNOWLEDGED = "acknowledged"

     def __str__(self) -> str:
         return self.value
@@ -96,6 +98,7 @@
     SKIPPED = TaskInstanceState.SKIPPED
     SENSING = TaskInstanceState.SENSING
     DEFERRED = TaskInstanceState.DEFERRED
+    ACKNOWLEDGED = TaskInstanceState.ACKNOWLEDGED

     task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)

@@ -104,6 +107,7 @@
         DagRunState.SUCCESS,
         DagRunState.RUNNING,
         DagRunState.FAILED,
+        DagRunState.ACKNOWLEDGED,
     )

     state_color: Dict[Optional[TaskInstanceState], str] = {
@@ -121,6 +125,7 @@
         TaskInstanceState.REMOVED: 'lightgrey',
         TaskInstanceState.SCHEDULED: 'tan',
         TaskInstanceState.DEFERRED: 'mediumpurple',
+        TaskInstanceState.ACKNOWLEDGED: 'sienna', # neither green, neither red --> red+green = brown
     }
     state_color[TaskInstanceState.SENSING] = state_color[TaskInstanceState.DEFERRED]
     state_color.update(STATE_COLORS)  # type: ignore
@@ -151,6 +156,7 @@
             TaskInstanceState.FAILED,
             TaskInstanceState.SKIPPED,
             TaskInstanceState.UPSTREAM_FAILED,
+            TaskInstanceState.ACKNOWLEDGED,
         ]
     )
     """
--- airflow/www/forms.py.orig   2021-12-07 15:11:39.133274432 +0100
+++ airflow/www/forms.py    2021-12-07 15:10:42.140438988 +0100
@@ -162,6 +162,7 @@
             ('running', 'running'),
             ('failed', 'failed'),
             ('up_for_retry', 'up_for_retry'),
+            ('acknowledged', 'acknowledged'),
         ),
         widget=Select2Widget(),
         validators=[InputRequired()],
--- airflow/www/views.py.orig   2021-12-07 15:48:39.388866769 +0100
+++ airflow/www/views.py    2021-12-07 17:32:38.806764466 +0100
@@ -93,6 +93,7 @@
 from airflow.api.common.experimental.mark_tasks import (
     set_dag_run_state_to_failed,
     set_dag_run_state_to_success,
+    set_dag_run_state_to_acknowledged,
 )
 from airflow.configuration import AIRFLOW_CONFIG, conf
 from airflow.exceptions import AirflowException
@@ -4024,6 +4025,33 @@
             flash('Failed to set state', 'error')
         return redirect(self.get_default_url())

+    @action(
+        'set_acknowledged',
+        "Set state to 'acknowledged'",
+        "All failed task instances would also be marked as acknowledged, are you sure?",
+        single=False,
+    )
+    @action_has_dag_edit_access
+    @provide_session
+    def action_set_acknowledged(self, drs, session=None):
+        """Set state to acknowledged."""
+        try:
+            count = 0
+            altered_tis = []
+            for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all():
+                count += 1
+                altered_tis += set_dag_run_state_to_acknowledged(
+                    current_app.dag_bag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session
+                )
+            altered_ti_count = len(altered_tis)
+            flash(
+                "{count} dag runs and {altered_ti_count} task instances "
+                "were set to acknowledged".format(count=count, altered_ti_count=altered_ti_count)
+            )
+        except Exception as e:
+            flash(f'Failed to set state: {e}', 'error')
+        return redirect(self.get_default_url())
+
     @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
     @action_has_dag_edit_access
     @provide_session
@@ -4384,6 +4412,14 @@
         self.update_redirect()
         return redirect(self.get_redirect())

+    @action('set_acknowledged', "Set state to 'acknowledged'", '', single=False)
+    @action_has_dag_edit_access
+    def action_set_acknowledged(self, tis):
+        """Set state to 'acknowledged'"""
+        self.set_task_instance_state(tis, State.ACKNOWLEDGED)
+        self.update_redirect()
+        return redirect(self.get_redirect())
+

 class AutocompleteView(AirflowBaseView):
     """View to provide autocomplete results"""
billcrook commented 1 year ago

@tothandor Do you plan on issuing a PR for this change?

tothandor commented 1 year ago

@tothandor Do you plan on issuing a PR for this change?

@billcrook No, or not recently. I couldn't really squeeze time for a full-fledged PR for the public, which would also require tests, documentation, and reviews. Maybe later I'll give it a try, but if you have the same in your mind, then please go ahead.

jscheffl commented 1 month ago

The last state of this issue is a long long time ago. Is it still treated positive or super-seeded with other priorities?

billcrook commented 1 month ago

I still believe this would be useful for data operations teams. A state to represent "failed but acknowledged" would allow for easier management and auditability. Keeping track of acknowledged failures cannot be handled within ariflow itself. Adding notes to task instances is not a solution. For teams that manage hundreds of jobs, this is an essential function.

jscheffl commented 1 month ago

What I have seen that many teams use it to have notes being added to DAG runs or TaskInstances. Then you can also leave-behind some qualified text os analysis and check.

Notes can be set and retrieved via API. So you can make this also automated.

I think such "additional marks" might be much better than extending the state model just for qualification purposes.

tothandor commented 1 month ago

What I have seen that many teams use it to have notes being added to DAG runs or TaskInstances. Then you can also leave-behind some qualified text os analysis and check.

Notes wont show you how many unhandled problems you have. Of course zero is the best number, but the situation is quite different if you have a few errors and when you have way more than that. When you have to clean up a mess, it's good to have an overview what problems are left to be addressed.

Notes can be set and retrieved via API. So you can make this also automated.

Am I understand it right, that one should have an external tool just to mark a failure acknowledged/handled?

I think such "additional marks" might be much better than extending the state model just for qualification purposes.

It depends on the volume. If you only have a few tasks/workflows and short history of jobs/TIs then this might be enough, but for hundreds of them, it would be insane.

billcrook commented 1 month ago

Standardized text in a free-form text field is a classic anti-pattern. When end users start doing this it is an indication there is a missing feature.