apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.42k stars 14.11k forks source link

Double execution of failure callback for task #27614

Closed Bowrna closed 7 months ago

Bowrna commented 1 year ago

Apache Airflow version

2.4.2

What happened

I have applied task callback for failure and success case using Cluster policy for the specific dag https://gist.github.com/Bowrna/7fdca8b546fc274edd068ffdae5b76f9

I am attaching the cluster policy that I have applied here. https://gist.github.com/Bowrna/1994894beea39fa8e1c269591b7f0346

On executing the DAG, the success callback is correctly invoked once for every successful task, while failure callback is invoked twice for a failure in task

What you think should happen instead

Like success callback, failure callback also should get executed only once.

How to reproduce

I have attached the sample DAG and airflow_local_settings.py file in which i have added the cluster policy used. On running airflow with that and executing the DAG either manually/scheduled will cause to log the below details

{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062485, "task_name": "runme_0", "task_duration": 1.178738, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062486, "task_name": "runme_1", "task_duration": 1.081635, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062488, "task_name": "runme_2", "task_duration": 1.071424, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062495, "task_name": "also_run_this", "task_duration": 0.075827, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062495, "task_name": "this_will_skip", "task_duration": 0.072133, "task_status": "skipped", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "failure", "unravel_timestamp": 1668062496, "task_name": "failure_case", "task_duration": 0.066066, "task_status": "failed", "task_operator": "PythonOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "failure", "unravel_timestamp": 1668062497, "task_name": "failure_case", "task_duration": 0.655575, "task_status": "failed", "task_operator": "PythonOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062498, "task_name": "run_after_loop", "task_duration": 0.073114, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}
{"dag_name": "test_bowrna", "dag_run_name": "scheduled__2022-11-09T00:00:00+00:00", "status_callback": "success", "unravel_timestamp": 1668062500, "task_name": "run_this_last", "task_duration": 0.078573, "task_status": "success", "task_operator": "BashOperator", "dag_start_date": "2022-11-10 06:41:22.757324+00:00", "dag_end_date": null, "dag_state": "running"}

Here you can see for the failure case, it is logged twice. Also both of these failure case logging have different duration(I have verified if i am double logging in code by mistake, but that is not the case)

Operating System

macOS Monterey version 12.2.1

Versions of Apache Airflow Providers

apache-airflow-providers-apache-hive==4.0.1 apache-airflow-providers-cncf-kubernetes==4.4.0 apache-airflow-providers-common-sql==1.2.0 apache-airflow-providers-ftp==3.1.0 apache-airflow-providers-http==4.0.0 apache-airflow-providers-imap==3.0.0 apache-airflow-providers-sqlite==3.2.1

Deployment

Virtualenv installation

Deployment details

I am currently running the airflow in development mode and testing callback and cluster policy

Anything else

Once it is getting executed as part of the taskinstance callback, while another time it is getting executed as part of DagFileProcessorProcess I pulled out the logs from the specific task that is getting logged twice and this is part of taskinstance.py flow

I found the below log in the task of the specific dag_run

Traceback (most recent call last):
  File "/Users/unravel/unravel_airflow/unravel_airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/Users/unravel/unravel_airflow/unravel_airflow/lib/python3.10/site-packages/airflow/operators/python.py", line 193, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/Users/unravel/unravel_airflow/airflow/dags/test.py", line 67, in fail_case
    raise AirflowFailException('Failure case test for cluster policy')
airflow.exceptions.AirflowFailException: Failure case test for cluster policy
[2022-11-09, 09:59:40 UTC] {taskinstance.py:851} DEBUG - Refreshing TaskInstance <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [running]> from DB
[2022-11-09, 09:59:40 UTC] {taskinstance.py:2325} DEBUG - Task Duration set to 0.061723
[2022-11-09, 09:59:40 UTC] {taskinstance.py:1412} DEBUG - Clearing next_method and next_kwargs.
[2022-11-09, 09:59:40 UTC] {taskinstance.py:1401} INFO - Immediate failure requested. Marking task as FAILED. dag_id=test_bowrna, task_id=failure_case, execution_date=20221109T095932, start_date=20221109T095940, end_date=20221109T095940
[2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Cluster dag policy:Task has failed
[2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Cluster policy:context for failure case: <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]>
[2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Task id: failure_case
[2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Task id: <function task_failure_alert at 0x10749a050>
[2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Path to write: /Users/unravel/unravel_airflow/airflow/event_logger/test_bowrna/09-11-2022/manual__2022-11-09T09:59:32.540415+00:00.json
[2022-11-09, 09:59:40 UTC] {logging_mixin.py:120} INFO - Info details to log: {'dag_name': 'test_bowrna', 'dag_run_name': 'manual__2022-11-09T09:59:32.540415+00:00', 'status_callback': 'failure', 'unravel_timestamp': 1667987980, 'task_name': 'failure_case', 'task_duration': 0.061723, 'task_status': <TaskInstanceState.FAILED: 'failed'>, 'task_operator': 'PythonOperator', 'dag_start_date': datetime.datetime(2022, 11, 9, 9, 59, 34, 24988, tzinfo=Timezone('UTC')), 'dag_end_date': None, 'dag_state': 'running'}
[2022-11-09, 09:59:40 UTC] {events.py:45} DEBUG - session flush listener: added [<TaskInstanceState.FAILED: 'failed'>] unchanged () deleted ['running'] - <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]>
[2022-11-09, 09:59:40 UTC] {cli_action_loggers.py:83} DEBUG - Calling callbacks: []
[2022-11-09, 09:59:40 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 1318 for task failure_case ('TaskInstance' object has no attribute 'task'; 49534)
[2022-11-09, 09:59:40 UTC] {local_task_job.py:164} INFO - Task exited with return code 1
[2022-11-09, 09:59:40 UTC] {dagrun.py:674} DEBUG - number of tis tasks for <DagRun test_bowrna @ 2022-11-09 09:59:32.540415+00:00: manual__2022-11-09T09:59:32.540415+00:00, state:running, queued_at: 2022-11-09 09:59:32.551538+00:00. externally triggered: True>: 0 task(s)
[2022-11-09, 09:59:40 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

Also I found another log in airflow scheduler

[2022-11-09T15:29:41.148+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.148+0530] {[dagbag.py:502](http://dagbag.py:502/)} DEBUG - Loaded DAG <DAG: test_bowrna>
[2022-11-09T15:29:41.149+0530] {[processor.py:766](http://processor.py:766/)} INFO - DAG(s) dict_keys(['test_bowrna']) retrieved from /Users/unravel/unravel_airflow/airflow/dags/test.py
[2022-11-09T15:29:41.149+0530] {[processor.py:639](http://processor.py:639/)} DEBUG - Processing Callback Request: {'full_filepath': '/Users/unravel/unravel_airflow/airflow/dags/test.py', 'processor_subdir': '/Users/unravel/unravel_airflow/airflow/dags', 'msg': "{'DAG Id': 'test_bowrna', 'Task Id': 'failure_case', 'Run Id': 'manual__2022-11-09T09:59:32.540415+00:00', 'Hostname': 'unravels-macbook-pro.local'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x1049f7a60>, 'is_failure_callback': True}
[2022-11-09T15:29:41.198+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.198+0530] {[taskinstance.py:1853](http://taskinstance.py:1853/)} ERROR - {'DAG Id': 'test_bowrna', 'Task Id': 'failure_case', 'Run Id': 'manual__2022-11-09T09:59:32.540415+00:00', 'Hostname': 'unravels-macbook-pro.local'}
[2022-11-09T15:29:41.199+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.199+0530] {[taskinstance.py:851](http://taskinstance.py:851/)} DEBUG - Refreshing TaskInstance <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [running]> from DB
[2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {[taskinstance.py:2325](http://taskinstance.py:2325/)} DEBUG - Task Duration set to 0.520437
[2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {[taskinstance.py:1412](http://taskinstance.py:1412/)} DEBUG - Clearing next_method and next_kwargs.
[2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {plugins_manager.py:300} DEBUG - Loading plugins
[2022-11-09T15:29:41.201+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.201+0530] {plugins_manager.py:244} DEBUG - Loading plugins from directory: /Users/unravel/unravel_airflow/airflow/plugins
[2022-11-09T15:29:41.202+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.202+0530] {plugins_manager.py:259} DEBUG - Importing plugin module /Users/unravel/unravel_airflow/airflow/plugins/plugin.py
[2022-11-09T15:29:41.202+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.202+0530] {plugins_manager.py:259} DEBUG - Importing plugin module /Users/unravel/unravel_airflow/airflow/plugins/listener.py
[2022-11-09T15:29:41.204+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.203+0530] {plugins_manager.py:224} DEBUG - Loading plugins from entrypoints
[2022-11-09T15:29:41.228+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.228+0530] {plugins_manager.py:316} DEBUG - Loading 1 plugin(s) took 0.03 seconds
[2022-11-09T15:29:41.228+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.228+0530] {plugins_manager.py:476} DEBUG - Integrate DAG plugins
[2022-11-09T15:29:41.230+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.230+0530] {[taskinstance.py:1099](http://taskinstance.py:1099/)} DEBUG - previous_execution_date was called
[2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.231+0530] {[taskinstance.py:1401](http://taskinstance.py:1401/)} INFO - Marking task as FAILED. dag_id=test_bowrna, task_id=failure_case, execution_date=20221109T095932, start_date=20221109T095940, end_date=20221109T095941
[2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Cluster dag policy:Task has failed
[2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Cluster policy:context for failure case: <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]>
[2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Task id: failure_case
[2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Task id: <function task_failure_alert at 0x1048c9240>
[2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Path to write: /Users/unravel/unravel_airflow/airflow/event_logger/test_bowrna/09-11-2022/manual__2022-11-09T09:59:32.540415+00:00.json
[2022-11-09T15:29:41.231+0530] {logging_mixin.py:120} INFO - Info details to log: {'dag_name': 'test_bowrna', 'dag_run_name': 'manual__2022-11-09T09:59:32.540415+00:00', 'status_callback': 'failure', 'unravel_timestamp': 1667987981, 'task_name': 'failure_case', 'task_duration': 0.520437, 'task_status': <TaskInstanceState.FAILED: 'failed'>, 'task_operator': 'PythonOperator', 'dag_start_date': datetime.datetime(2022, 11, 9, 9, 59, 34, 24988, tzinfo=Timezone('UTC')), 'dag_end_date': None, 'dag_state': 'running'}
[2022-11-09T15:29:41.234+0530] {[processor.py:725](http://processor.py:725/)} INFO - Executed failure callback for <TaskInstance: test_bowrna.failure_case manual__2022-11-09T09:59:32.540415+00:00 [failed]> in state failed
[2022-11-09T15:29:41.236+0530] {logging_mixin.py:120} INFO - [2022-11-09T15:29:41.236+0530] {[dagbag.py:647](http://dagbag.py:647/)} DEBUG - Running dagbag.sync_to_db with retries. Try 1 of

This also logs the failure case that gets executed from processor.py flow of the code

Are you willing to submit PR?

Code of Conduct

Bowrna commented 1 year ago

I would like to submit a PR, but I want to confirm if this is an actual bug from committers and find out if my understanding is correct.

tirkarthi commented 1 year ago

https://gist.github.com/Bowrna/1994894beea39fa8e1c269591b7f0346#file-airflow_local_settings-py-L120

if task.on_failure_callback.__qualname__ != "add_failure_callback.<locals>.new_callback" and task.on_success_callback.__qualname__ != "task_failure_alert":       

Should this be checking task.on_failure_callback.__qualname__ != "task_failure_alert" instead of task.on_success_callback.__qualname__ != "task_failure_alert" similar to the condition in add_success_callback

potiuk commented 1 year ago

@Bowrna - hard to say - if you are able to reproduce it, then likely it is a bug. Finding a problem and fixing might actually prove it.

Bowrna commented 1 year ago

https://gist.github.com/Bowrna/1994894beea39fa8e1c269591b7f0346#file-airflow_local_settings-py-L120

@tirkarthi Yes you are right. Thanks for pointing out this mistake. But even fixing it, the issue is not resolved. I have added this check to avoid double loading of same function callback

Bowrna commented 1 year ago

@potiuk If someone could throw light on the below point it would be useful for me to go and fix this issue:

Once it(callback) is getting executed as part of the taskinstance callback, while another time it is getting executed as part of DagFileProcessorProcess

Is it expected for callback to be executed from both of these file.

I will go back and debug this further and share more details in this thread.

I will also check if success callback is getting executed via DagFileProcessorProcess too.

potiuk commented 1 year ago

Generally speaking - yes the callback can be executed by both task running (when task CAN execute callback) and by DagFileProcessor (when task callback cannot be executed in task - for example when task was forcefully killed).

That's the theory.

However - this is a distributed system - and there are different modes of execution of such distributed operations: at-most-once, at-least-once and exactly-once.

Explained for example here: https://medium.com/@madhur25/meaning-of-at-least-once-at-most-once-and-exactly-once-delivery-10e477fafe16#:~:text=As%20the%20name%20suggests%2C%20At,exception%2C%20the%20message%20is%20lost.

I believe (this needs to be looked a bit closer as I think it is not explicitly specified - but it would be great to verify it and document at least how the callbacks are delivered. Exactly-once is surprisingly hard to achieve in distributed systems - and I think it's almost impossible when you consider all kind of failure cases.

I believe what our callback mechanism tries to achieve is "at-least-once". I am not 100% sure if this is achieved in all kinds of situations, and I am not sure exactly what's the sequence of events is to lead to having duplicates, but generally speaking I think we cannot guarantee that callback will be executed exactly once.

If this is easily reproducible, then it might a bug because "at-least-once" is only happening when there are some unexpected events happening. If this is happening in "normal" circumstances - this is a bug. If it is triggered by some error scenarios - not so much and likely we won't be able to do anything about it - because it would be to complex and costly to try to implement "exactly-once".

Bowrna commented 1 year ago

@potiuk I will check if this is getting executed twice in "normal" circumstances / complex error scenarios as you mentioned.

My understanding from reading the documentation was that the failure callback has to be executed in all cases.

Generally speaking - yes the callback can be executed by both task running (when task CAN execute callback) and by DagFileProcessor (when task callback cannot be executed in task - for example when task was forcefully killed).

When the execution is failed during the run, or the task is forcefully killed, I thought a failure callback for the task will be invoked as there is no mention of this one in the documentation. I can verify this part and document it.

potiuk commented 1 year ago

They are executed - but not in task because task is not there in this case. Callbacks for tasks that were 'killed' are executed in DagFileProcessor normally. But again - the distribution happens, so it is very likely that there are cases where the callbacks are execute in both - task and DFP because the DFP 'thonks' the callback was not executed in the task..this is one of the cases where callbacks might be executed more than once. And if the DFP is killed after callback is executed so that it cannot record this, it can be executed again most likely. This is the 'at least once' semantics

Bowrna commented 1 year ago
 def fail_case():
        raise AirflowFailException('Failure case test for cluster policy')

    failure_test = PythonOperator(
        task_id='failure_case',
        python_callable=fail_case,
        # on_failure_callback=on_failure_callback,
    )

In this case, this is how I have made the task fail. I have added an exception to fail it and task callback is invoked. DagFileProcessor is also invoked in this case.

And if the DFP is killed after callback is executed so that it cannot record this, it can be executed again most likely. This is the 'at least once' semantics

I don't understand this point of yours. Could you help me to understand it? Do you mean to tell it can be executed more than twice too?

potiuk commented 1 year ago

I don't understand this point of yours. Could you help me to understand it? Do you mean to tell it can be executed more than twice too?

Potentially yes. Not sure if in our case though.

Bowrna commented 1 year ago

I see. then the callback function has to be written to handle this case in such a way that the duplicate is handled.

The problem with this double callback is the end_date of the failure task is not reliable as we can't determine which is the right callback for the failure case and can't determine the end_date of the failed task

potiuk commented 1 year ago

This is why we introduced listener API - that should be one more reason to go to 2.5.0. It's really counter-productive to make current version implement some workarounds for that - much better is to spend effort on migrating and using the new API :). That's my honest advise.

github-actions[bot] commented 8 months ago

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

github-actions[bot] commented 7 months ago

This issue has been closed because it has not received response from the issue author.