Closed luqic closed 3 years ago
Thanks for opening your first issue here! Be sure to follow the issue template!
We're also impacted by this bug. We're running hundreds of tasks every hour and every day we're ending up with multiple task instances stuck in the queued state. We currently have to clear out the state of all those queued tasks, so that they are picked up again.
Apache Airflow version: 2.0.2
Kubernetes version: Server Version: version.Info{Major:"1", Minor:"20+", GitVersion:"v1.20.4-eks-6b7464", GitCommit:"6b746440c04cb81db4426842b4ae65c3f7035e53", GitTreeState:"clean", BuildDate:"2021-03-19T19:33:03Z", GoVersion:"go1.15.8", Compiler:"gc", Platform:"linux/amd64"}
We noticed this issue with Airflow 2.1.2. Job went from queued to failed without retry, looking at the code I am not sure how to fix it. It is clear that in scheduler_job.py
on line 1238 we see the relevant logs.
~Maybe there should be logic here to check if the task needs to be retried and change the state to retried if needed? That logic is currently completely circumvented by just setting the state from the scheduler.~
Looking at the code again, a TaskCallbackRequest
event is sent to the processor_agent, this will eventually be processed by the function execute_callbacks
, that will execute the task instance method handle_failure_with_callback
, this one should set the state of the task instance to Up for retry
, however this does not happen for some reason. In theory the dagbag could have not contained the dag or task when it was processing the message. But that seems very unlikely
The relevant logs (the dag and task name are erased because they might contain sensitive information):
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| @timestamp | log |
|-------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2021-08-06 17:27:30.210 | [2021-08-06 17:27:30,209] {scheduler_job.py:1254} ERROR - Executor reports task instance <TaskInstance: xxxx 1990-06-10 00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally? |
| 2021-08-06 17:27:30.205 | [2021-08-06 17:27:30,204] {scheduler_job.py:1218} INFO - Executor reports execution of xxxx execution_date=1990-06-10 00:00:00+00:00 exited with status failed for try_number 5 |
| 2021-08-06 17:27:30.204 | [2021-08-06 17:27:30,204] {kubernetes_executor.py:546} INFO - Changing state of (TaskInstanceKey(dag_id='xxxx', task_id='xxxx', execution_date=datetime.datetime(1990, 6, 10, 0, 0, tzinfo=tzlocal()), try_number=5), 'failed', 'xxxx, 'dev', '97113730') to failed |
| 2021-08-06 17:27:30.203 | [2021-08-06 17:27:30,202] {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: xxxx; state: failed; annotations: {'dag_id': 'xxxx', 'task_id': 'xxxx', 'execution_date': '1990-06-10T00:00:00+00:00', 'try_number': '5'} |
| 2021-08-06 17:22:23.371 | [2021-08-06 17:22:23,371] {scheduler_job.py:1245} INFO - Setting external_id for <TaskInstance: xxxx 1990-06-10 00:00:00+00:00 [queued]> to 1606 |
| 2021-08-06 17:22:23.367 | [2021-08-06 17:22:23,367] {scheduler_job.py:1218} INFO - Executor reports execution of xxxx execution_date=1990-06-10 00:00:00+00:00 exited with status queued for try_number 5 |
2021-08-06 17:27:30.205 | [2021-08-06 17:27:30,204] {scheduler_job.py:1218} INFO - Executor reports execution of xxxxr execution_date=1990-06-10 00:00:00+00:00 exited with status failed for try_number 5
Kubernetes version (EKS): Server Version: version.Info{Major:"1", Minor:"21+", GitVersion:"v1.21.2-eks-0389ca3", GitCommit:"8a4e27b9d88142bbdd21b997b532eb6d493df6d2", GitTreeState:"clean", BuildDate:"2021-07-31T01:34:46Z", GoVersion:"go1.16.5", Compiler:"gc", Platform:"linux/amd64"}
This PR https://github.com/apache/airflow/pull/15929 fixed the issue of not having to clear the task before it can be rerun again and that is a major issue otherwise the task is stuck as explained by @mmazek above
@stijndehaes, can you check the log at logs/scheduler/{CURRENT_DATE}/{DAGFILENAME.py.log}
when this happens?
@ashb @jhtimmins, I'm now thinking that this change https://github.com/apache/airflow/pull/15929 should not be released yet. Though it clears tasks from being stuck in the queued/up_for_retry state, it sets them to failed state without checking if they have retries.
I'm wondering if there's a better way to do it?
@ephraimbuddy I have found a way to consistently trigger the issue using the dag attached below. You first have to just let it run. It will nicely retry and be set to failed after trying twice. If you clear the task after that it will only try once instead of two times. Not 100% sure if it's the same issue though, but this one is also unexpected and at least is reproducable.
import importlib
from airflow import DAG
from datetime import datetime, timedelta
from kubernetes.client import models as k8s
from airflow.operators.python import PythonOperator
utils = importlib.import_module("sample-python.utils")
default_args = {
"start_date": datetime.now() - timedelta(days=2),
"retries": 2,
"retry_delay": timedelta(seconds=1),
}
sample_python_dag = DAG(
"retry-failure",
default_args=default_args,
schedule_interval="@daily",
)
def my_func(ds, **kwargs):
return ''
PythonOperator(
dag=sample_python_dag,
task_id="task",
python_callable=my_func,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
command=["FailEarly"],
),
],
)
),
},
)
The log of the scheduler is the following pattern repeated:
[2021-08-06 17:29:52,318] {logging_mixin.py:104} INFO - [2021-08-06 17:29:52,318] {dagbag.py:496} INFO - Filling up the DagBag from /xxx/dag.py
[2021-08-06 17:29:52,465] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
[2021-08-06 17:29:52,467] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
[2021-08-06 17:29:52,501] {scheduler_job.py:642} INFO - DAG(s) dict_keys(['xxxx']) retrieved from /xxx/dag.py
[2021-08-06 17:29:52,551] {logging_mixin.py:104} INFO - [2021-08-06 17:29:52,551] {dag.py:1833} INFO - Sync 1 DAGs
[2021-08-06 17:29:52,569] {logging_mixin.py:104} INFO - [2021-08-06 17:29:52,569] {dag.py:2306} INFO - Setting next_dagrun for xxx to None
[2021-08-06 17:29:52,581] {scheduler_job.py:189} INFO - Processing /xxxx/dag.py took 0.267 seconds
[2021-08-06 17:30:03,012] {scheduler_job.py:181} INFO - Started process (PID=1107) to work on /xxx/dag.py
[2021-08-06 17:30:03,014] {scheduler_job.py:632} INFO - Processing file /xxxx/dag.py for tasks to queue
@ephraimbuddy So the metadata database is seeing the task as failed (as was reported by the executor) so that's what's showing up in the UI, but the scheduler still thinks it's queued, so it never attempts to retry? Am I understanding the behavior correctly?
If so, making sure that #15929 includes the task retrying if it has retries left will be key. Otherwise will the behavior difference even be noticeable to the user?
@ephraimbuddy So the metadata database is seeing the task as failed (as was reported by the executor) so that's what's showing up in the UI, but the scheduler still thinks it's queued, so it never attempts to retry? Am I understanding the behavior correctly?
If so, making sure that #15929 includes the task retrying if it has retries left will be key. Otherwise, will the behavior difference even be noticeable to the user?
I think retrying is a lesser evil to getting stuck in queued. That was why I added #15929 which will be released in 2.1.3
The problem is when the executor reports that this task has failed and the scheduler sees it as queued, without #15929 it gets stuck in queued(even in the UI) and at times in up_for_retry(if it has retry) but never run again. It's also failed in some cases as @luqic said. But if it's stuck in queued
, the task has to be cleared as @mmazek said above before it'd run again. See also https://github.com/apache/airflow/issues/13542.
So without #15929, the task state would be set inup_for_retry
at times as I found out but won't be run again. I still check for other possible solutions and your suggestion is worth trying
I can reproduce this issue like this:
Use this dag on 2.1.1:
from datetime import timedelta
from kubernetes.client import models as k8s
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="pending",
schedule_interval=None,
start_date=days_ago(2),
) as dag:
BashOperator(
task_id="forever_pending",
bash_command="date; sleep 30; date",
retries=3,
retry_delay=timedelta(seconds=30),
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(mount_path="/foo/", name="vol")
],)],
volumes=[
k8s.V1Volume(
name="vol",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="missing"
),)],)),},)
And here is the scheduler log from around the failure
[2021-09-15 17:48:56,352] {scheduler_job.py:873} WARNING - Set 1 task instances to state=failed as their associated DagRun was not in RUNNING state
2021-09-15T17:48:56.134716Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:48:56.134808Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.729675760": MODIFY|ATTRIB
[2021-09-15 17:48:47,821] {dagrun.py:429} ERROR - Marking run <DagRun pending @ 2021-09-15 17:43:28.990599+00:00: manual__2021-09-15T17:43:28.990599+00:00, externally triggered: True> failed
[2021-09-15 17:48:47,769] {scheduler_job.py:1258} ERROR - Executor reports task instance <TaskInstance: pending.forever_pending 2021-09-15 17:43:28.990599+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-09-15 17:48:47,769] {scheduler_job.py:1265} INFO - Setting task instance <TaskInstance: pending.forever_pending 2021-09-15 17:43:28.990599+00:00 [queued]> state to failed as reported by executor
[2021-09-15 17:48:47,761] {kubernetes_executor.py:549} INFO - Changing state of (TaskInstanceKey(dag_id='pending', task_id='forever_pending', execution_date=datetime.datetime(2021, 9, 15, 17, 43, 28, 990599, tzinfo=tzlocal()), try_number=1), 'failed', 'pendingforeverpending.cc4a625ffe0d4da88709098daba98d87', 'astronomer-magnificent-aurora-4284', '1751732637') to failed
[2021-09-15 17:48:47,761] {scheduler_job.py:1229} INFO - Executor reports execution of pending.forever_pending execution_date=2021-09-15 17:43:28.990599+00:00 exited with status failed for try_number 1
[2021-09-15 17:48:47,759] {kubernetes_executor.py:372} INFO - Attempting to finish pod; pod_id: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87; state: failed; annotations: {'dag_id': 'pending', 'task_id': 'forever_pending', 'execution_date': '2021-09-15T17:43:28.990599+00:00', 'try_number': '1'}
[2021-09-15 17:48:46,695] {kubernetes_executor.py:149} INFO - Event: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87 had an event of type DELETED
[2021-09-15 17:48:46,695] {kubernetes_executor.py:200} INFO - Event: Failed to start pod pendingforeverpending.cc4a625ffe0d4da88709098daba98d87
[2021-09-15 17:48:46,692] {kubernetes_executor.py:149} INFO - Event: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87 had an event of type MODIFIED
[2021-09-15 17:48:46,692] {kubernetes_executor.py:203} INFO - Event: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87 Pending
[2021-09-15 17:48:46,676] {kubernetes_executor.py:625} ERROR - Pod "pendingforeverpending.cc4a625ffe0d4da88709098daba98d87" has been pending for longer than 300 seconds.It will be deleted and set to failed.
2021-09-15T17:47:50.966665Z info watchFileEvents: notifying
2021-09-15T17:47:47.079744Z info watchFileEvents: notifying
2021-09-15T17:47:40.966397Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:47:40.966527Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB
2021-09-15T17:47:37.079501Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:47:37.079624Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.729675760": MODIFY|ATTRIB
[2021-09-15 17:47:07,909] {scheduler_job.py:1841} INFO - Resetting orphaned tasks for active dag runs
[2021-09-15 17:47:00,347] {scheduler_job.py:1841} INFO - Resetting orphaned tasks for active dag runs
2021-09-15T17:46:35.978572Z info watchFileEvents: notifying
2021-09-15T17:46:25.978277Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:46:25.978421Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB
2021-09-15T17:46:21.074893Z info watchFileEvents: notifying
2021-09-15T17:46:11.074610Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:46:11.074754Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.729675760": MODIFY|ATTRIB
2021-09-15T17:45:11.006936Z info watchFileEvents: notifying
2021-09-15T17:45:01.006688Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:45:01.006777Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB
2021-09-15T17:45:01.006787Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB
Apache Airflow version: 2.0.2
Kubernetes version:
What happened:
After the worker pod for the task failed to start, the task is marked as failed with the error message
Executor reports task instance <TaskInstance: datalake_db_cdc_data_integrity.check_integrity_core_prod_my_industries 2021-06-14 00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
. The task should have been reattempted as it still has retries left.What you expected to happen:
The task status should have been set as
up_for_retry
instead of failing immediately.Anything else we need to know:
This error has occurred 6 times over the past 2 months, and to seemingly random tasks in different DAGs. We run 60 DAGs with 50-100 tasks each every 30 minutes. The affected tasks are a mix of PythonOperator and SparkSubmitOperator. The first time we saw it was in mid Apr, and we were on Airflow version 2.0.1. We upgraded to Airflow version 2.0.2 in early May, and the error has occurred 3 more times since then.
Also, the issue where the worker pod cannot start is a common error that we frequently encounter, but in most cases these tasks are correctly marked as
up_for_retry
and reattempted.This is currently not a big issue for us since it's so rare, but we have to manually clear the tasks that failed to get them to rerun because the tasks are not retrying. They have all succeeded on the first try after clearing.
Also, I'm not sure if this issue is related to #10790 or #16285, so I just created a new one. It's not quite the same as #10790 because the tasks affected are not ExternalTaskSensors, and also #16285 because the offending lines pointed out there are not in 2.0.2.
Thanks!