apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.51k stars 14.14k forks source link

Many running task instances are cleared by the new scheduler when an old scheduler is terminated and its health check server is periodically requested #39088

Closed tanvn closed 3 months ago

tanvn commented 5 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

First confirmed with 2.5.0. Now, confirmed that this also happens on 2.8.4

What happened?

I am running Airflow 2.5.0 with Kubernetes executor. Recently, I have enable the health check server of the scheduler and configured a blackbox exporter which sends a request to check if the scheduler is healthy or not every 6-7seconds. Normally, everything works fine. However, when a new deployment is rolled out, the old scheduler is terminated and a new one is created, I found that many running task instances are cleared, i.e: the work pods are terminated and up to retry (some of them are heavy tasks, so this is quite bad for us).

And if I disable the blackbox exporter (ie: stop sending GET requests to the health server of the scheduler), the issue won't happen (no cleared task instances, just adopted). So I am considering there is something wrong with the logic to determine which task instances should be cleared instead of adopted.

Log from the new scheduler

[2024-04-17T08:14:24.568+0000] {scheduler_job.py:1463} INFO - Reset the following 102 orphaned TaskInstances:

Log from a worker pod

[2024-04-17, 16:56:29 JST] {local_task_job.py:223} WARNING - State of this instance has been externally set to restarting. Terminating instance.
[2024-04-17, 16:56:29 JST] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 92. PIDs of all processes in the group: [350, 92]
[2024-04-17, 16:56:29 JST] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 92
[2024-04-17, 16:56:29 JST] {taskinstance.py:1483} ERROR - Received SIGTERM. Terminating subprocesses.
[2024-04-17, 16:56:29 JST] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
  ...
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1485, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

What you think should happen instead?

I expect that the running task instances should be adopted correctly by the new scheduler so that the tasks can continue without being interrupted.

How to reproduce

I described above.

Operating System

CentOS 7.9

Versions of Apache Airflow Providers

Deployment

Official Apache Airflow Helm Chart

Deployment details

scheduler section configuration:

    enable_health_check: true
    scheduler_health_check_server_port: 8974
    job_heartbeat_sec: 45
    scheduler_heartbeat_sec: 30
    scheduler_health_check_threshold: 90

Anything else?

The issue does not happen 100% when a new deployment is rolled out. Up to now, 3 of 5 times (60%)

Are you willing to submit PR?

Code of Conduct

tanvn commented 5 months ago

@potiuk @ephraimbuddy Hi, do you even have a clue about this?

tanvn commented 5 months ago

I also checked the log of the scheduler

[2024-04-17T08:14:19.661+0000] {kubernetes_executor.py:749} INFO - attempting to adopt pod pod-3dbaxxx
[2024-04-17T08:14:19.888+0000] {kubernetes_executor.py:220} INFO - Event: pod-3dbaxxx is Running
[2024-04-17T08:15:30.521+0000] {kubernetes_executor.py:213} INFO - Event: pod-3dbaxxx Succeeded
....

which makes me believe that the following code block was executed successfully https://github.com/apache/airflow/blob/2.5.0/airflow/executors/kubernetes_executor.py#L749-L765

and pod_id is removed from the dict: pod_ids.pop(pod_id) so it is weird that it is included by tis_to_flush later πŸ€”

tanvn commented 5 months ago

And @dstandish it seems that this issue is related to the health check server of the scheduler so I wonder if you might have any idea on this πŸ™‡

dstandish commented 5 months ago

Hi @tanvn

It seems you've made some progress with your repro scenario. What I would recommend doing is continue digging in. Add more logging to try to see exactly what component is taking what action and based on what information. Do you think you can try that? You can either run locally in a virtualenv based on airflow main, or you can check out a specific version and then create a patch which you can apply in your dockerfile if running on helm (or you can mount airflow into your containers). I'd be happy to help you with any dev setup stuff but it seems like you will be able to identify the root cause here so i want to encourage you to keep going at it.

RNHTTR commented 5 months ago

@tanvn Can you check the audit logs for a task with the following log?

[2024-04-17, 16:56:29 JST] {local_task_job.py:223} WARNING - State of this instance has been externally set to restarting. Terminating instance.

It's suspicious that the scheduler restarting resulted in this behavior it's not happening consistently.

tanvn commented 5 months ago

@dstandish Thanks! Yes, I believe I can reproduce on my dev environment and I want to build a docker image from the source code, if there is a document, I would appreciate if you could share with me (cc @potiuk )

@RNHTTR As the task above has too many audit logs, I will give you an example of a different task (but the issue is totally the same) here is its log

[2024-04-17, 17:15:31 JST] {local_task_job.py:223} WARNING - State of this instance has been externally set to queued. Terminating instance.
[2024-04-17, 17:15:31 JST] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 92. PIDs of all processes in the group: [348, 92]
[2024-04-17, 17:15:31 JST] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 92
[2024-04-17, 17:15:31 JST] {taskinstance.py:1483} ERROR - Received SIGTERM. Terminating subprocesses.
[2024-04-17, 17:15:31 JST] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
...
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1485, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

And below is the audit log for the task during that time (for the sake of security, I have replaced the real names with dummy ones)

2024-04-17, 17:15:31    my_create_task  failed  2024-04-17 07:10:00+00:00   my_beta_acc None
2024-04-17, 17:14:58    my_create_task  cli_task_run    None    airflow {"host_name": "my-task-hourly-hits-all-hourl-a715dd4673f247a28be4bda8bb5429af", "full_command": "['/usr/local/bin/airflow', 'tasks', 'run', 'my_beta_acc__hourly_hits_all_hourly_beta', 'my_create_task', 'scheduled__2024-04-17T07:10:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/my_dag.py']"}
2024-04-17, 17:12:31    my_create_task  cli_task_run    None    airflow {"host_name": "my-task-hourly-hits-all-hourl-28974e68c5484878b56ed2c9e993ccc4", "full_command": "['/usr/local/bin/airflow', 'tasks', 'run', 'my_beta_acc__hourly_hits_all_hourly_beta', 'my_create_task', 'scheduled__2024-04-17T07:10:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/my_dag.py']"}
2024-04-17, 17:12:31    my_create_task  running 2024-04-17 07:10:00+00:00   my_beta_acc None
2024-04-17, 17:10:44    my_create_task  cli_task_run    None    airflow {"host_name": "my-task-hourly-hits-all-hourl-28974e68c5484878b56ed2c9e993ccc4", "full_command": "['/usr/local/bin/airflow', 'tasks', 'run', 'my_beta_acc__hourly_hits_all_hourly_beta', 'my_create_task', 'scheduled__2024-04-17T07:10:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/my_dag.py']"}
tanvn commented 5 months ago

I managed to reproduce this issue on my development environment by creating a DAG with about 100 tasks. When there are about 15 running tasks, I run a helm command to upgrade the deployment, then the old scheduler is terminated and a new one is created and running. The log shows that

[2024-04-18T05:45:33.367+0000] {kubernetes_executor.py:749} INFO - attempting to adopt pod orphaned-test-ba20047562f24ec993a3816b7d17cfd7
...
[2024-04-18T05:45:33.570+0000] {kubernetes_executor.py:220} INFO - Event: orphaned-test-ba20047562f24ec993a3816b7d17cfd7 is Running

but later, the corresponding task is still reset:

[2024-04-18T05:45:38.658+0000] {scheduler_job.py:1463} INFO - Reset the following 8 orphaned TaskInstances:
....

And the task is terminated later

[2024-04-18, 14:45:58 JST] {local_task_job.py:223} WARNING - State of this instance has been externally set to queued. Terminating instance.
[2024-04-18, 14:45:58 JST] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 34. PIDs of all processes in the group: [34]
[2024-04-18, 14:45:58 JST] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 34
[2024-04-18, 14:45:58 JST] {taskinstance.py:1483} ERROR - Received SIGTERM. Terminating subprocesses.
[2024-04-18, 14:45:58 JST] {taskinstance.py:1772} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/plugins/operators/my_dummy_operator.py", line 95, in execute
    time.sleep(300)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1485, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

The cause is remaining unclear but I can confirm that the issue can be reproduced (so maybe this is a bug).

tanvn commented 5 months ago

Just tested with Airflow 2.8.4 and confirmed that the issue still happens.

[2024-04-18T08:29:32.060+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod orphaned-test-select-table-67-1ljpj4u3

[2024-04-18T08:29:32.259+0000] {kubernetes_executor_utils.py:304} INFO - Event: orphaned-test-select-table-67-1ljpj4u3 is Running, annotations: <omitted>
...
[2024-04-18T08:29:36.977+0000] {scheduler_job_runner.py:1671} INFO - Reset the following 32 orphaned TaskInstances:
...

log on the task instance:

[2024-04-18, 17:25:53 JST] {task_command.py:423} INFO - Running <TaskInstance: orphaned_test.select_table_67 scheduled__2024-04-18T06:30:00+00:00 [running]> on host orphaned-test-select-table-67-1ljpj4u3
...
[2024-04-18, 17:30:23 JST] {local_task_job_runner.py:302} WARNING - State of this instance has been externally set to scheduled. Terminating instance.
[2024-04-18, 17:30:23 JST] {process_utils.py:131} INFO - Sending 15 to group 12. PIDs of all processes in the group: [12]
[2024-04-18, 17:30:23 JST] {process_utils.py:86} INFO - Sending the signal 15 to group 12
[2024-04-18, 17:30:23 JST] {taskinstance.py:2483} ERROR - Received SIGTERM. Terminating subprocesses.
[2024-04-18, 17:30:23 JST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/plugins/operators/my_dummy_operator.py", line 95, in execute
    time.sleep(300)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2485, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
tanvn commented 5 months ago

@dstandish

~~I would like to build a Docker image from the source code (which I am going to add some modifications for testing) on my Macbook (Apple M2), is there any document guide for this? I would like to use CentOS 7 as a base image.~~

Never mind, I found a way πŸ˜„

RNHTTR commented 5 months ago

@tanvn Thanks for the extra details! I've assigned you to the Issue.

paramjeet01 commented 5 months ago

@tanvn @RNHTTR , Could this issue be related to the one I raised : https://github.com/apache/airflow/issues/39096 We are running 8 schedulers and we see this issue happening often , where the task is killed by SIGTERM and the pod is still running. When the task is retried again , we have error log stating the labels already exists and we see the pod is still running.

paramjeet01 commented 5 months ago

@dstandish

~I would like to build a Docker image from the source code (which I am going to add some modifications for testing) on my Macbook (Apple M2), is there any document guide for this? I would like to use CentOS 7 as a base image.~

Never mind, I found a way πŸ˜„

How did you solve the issue @tanvn

tanvn commented 5 months ago

@paramjeet01 I use pip to install from the source code locally: https://github.com/apache/airflow/blob/4ae85d754e9f8a65d461e86eb6111d3b9974a065/INSTALL#L73

on a docker image. The base docker image is https://hub.docker.com/r/centos/python-38-centos7 (or you can use CentOS7 then install Python with the version you want on it)

There are some required packages/libraries in order to install Airflow on CentOS but it is not that difficult.

RNHTTR commented 5 months ago

@tanvn Can you share the full traceback?

tanvn commented 5 months ago

@RNHTTR Tested and reproduced the issue on Airflow 2.8.4, below is the error logs from scheduler and one terminated pod

2024-04-23T15:56:15.805+0000] {task_context_logger.py:63} INFO - Task context logging is enabled
[2024-04-23T15:56:15.806+0000] {executor_loader.py:115} INFO - Loaded executor: KubernetesExecutor
/usr/local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:165 FutureWarning: The config section [kubernetes] has been renamed to [kubernetes_executor]. Please update your `conf.get*` call to use the new name
[2024-04-23T15:56:16.000+0000] {scheduler_job_runner.py:807} INFO - Starting the scheduler
[2024-04-23T15:56:16.002+0000] {scheduler_job_runner.py:814} INFO - Processing each file at most -1 times
[2024-04-23T15:56:16.004+0000] {kubernetes_executor.py:318} INFO - Start Kubernetes executor
[2024-04-23T15:56:16.091+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0
[2024-04-23T15:56:16.118+0000] {kubernetes_executor.py:239} INFO - Found 0 queued task instances
[2024-04-23T15:56:16.194+0000] {manager.py:169} INFO - Launched DagFileProcessorManager with pid: 34
[2024-04-23T15:56:16.196+0000] {scheduler_job_runner.py:1607} INFO - Adopting or resetting orphaned tasks for active dag runs
[2024-04-23T15:56:16.207+0000] {settings.py:60} INFO - Configured default timezone UTC
[2024-04-23T15:56:16.301+0000] {settings.py:541} INFO - Loaded airflow_local_settings from /opt/airflow/config/airflow_local_settings.py .
172.32.255.100 - - [23/Apr/2024 15:56:16] "GET /health HTTP/1.1" 200 -
[2024-04-23T15:56:17.186+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-25-0akkpbca
[2024-04-23T15:56:17.392+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-26-v3hdab2d
[2024-04-23T15:56:17.397+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-25-0akkpbca is Running, annotations: <omitted>
[2024-04-23T15:56:17.590+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-26-v3hdab2d is Running, annotations: <omitted>
[2024-04-23T15:56:17.591+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-27-3fgog69m
172.32.212.113 - - [23/Apr/2024 15:56:17] "GET /health HTTP/1.1" 200 -
[2024-04-23T15:56:17.785+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-28-12d0e5k3
[2024-04-23T15:56:17.791+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-27-3fgog69m is Running, annotations: <omitted>
[2024-04-23T15:56:17.988+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-29-nyli4xns
[2024-04-23T15:56:17.992+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-28-12d0e5k3 is Running, annotations: <omitted>
[2024-04-23T15:56:18.184+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-29-nyli4xns is Running, annotations: <omitted>
[2024-04-23T15:56:18.193+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-3-rrdebytu
[2024-04-23T15:56:18.383+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-32-bce8olgk
[2024-04-23T15:56:18.385+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-3-rrdebytu is Running, annotations: <omitted>
[2024-04-23T15:56:18.501+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-32-bce8olgk is Running, annotations: <omitted>
[2024-04-23T15:56:18.589+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-56-uf9tmjkr
[2024-04-23T15:56:18.701+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-56-uf9tmjkr is Running, annotations: <omitted>
[2024-04-23T15:56:18.797+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-57-sgtmtbw9
[2024-04-23T15:56:18.987+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-57-sgtmtbw9 is Running, annotations: <omitted>
[2024-04-23T15:56:18.988+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-59-24yyhobe
[2024-04-23T15:56:19.102+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-6-aj680tdg
[2024-04-23T15:56:19.195+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-59-24yyhobe is Running, annotations: <omitted>
[2024-04-23T15:56:19.284+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-60-3vx8m1ak
[2024-04-23T15:56:19.297+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-6-aj680tdg is Running, annotations: <omitted>
[2024-04-23T15:56:19.488+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-61-tccea81c
[2024-04-23T15:56:19.488+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-60-3vx8m1ak is Running, annotations: <omitted>
[2024-04-23T15:56:19.683+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-61-tccea81c is Running, annotations: <omitted>
[2024-04-23T15:56:19.685+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-62-wm5f7it8
[2024-04-23T15:56:19.884+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-63-tan8e9mj
[2024-04-23T15:56:19.886+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-62-wm5f7it8 is Running, annotations: <omitted>
[2024-04-23T15:56:20.082+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-65-a8j9q7d1
[2024-04-23T15:56:20.096+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-63-tan8e9mj is Running, annotations: <omitted>
[2024-04-23T15:56:20.199+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-70-no28k3w3
[2024-04-23T15:56:20.382+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-65-a8j9q7d1 is Running, annotations: <omitted>
[2024-04-23T15:56:20.387+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-76-44539ook
[2024-04-23T15:56:20.485+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-70-no28k3w3 is Running, annotations: <omitted>
[2024-04-23T15:56:20.588+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-76-44539ook is Running, annotations: <omitted>
[2024-04-23T15:56:20.588+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-77-zq4wpeqz
[2024-04-23T15:56:20.787+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-77-zq4wpeqz is Running, annotations: <omitted>
[2024-04-23T15:56:20.887+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-78-90a0rqll
[2024-04-23T15:56:20.999+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-79-mbefhn42
[2024-04-23T15:56:20.999+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-78-90a0rqll is Running, annotations: <omitted>
[2024-04-23T15:56:21.101+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-79-mbefhn42 is Running, annotations: <omitted>
[2024-04-23T15:56:21.105+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-8-zs4sulzt
[2024-04-23T15:56:21.301+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-80-ayuh71yt
[2024-04-23T15:56:21.301+0000] {kubernetes_executor_utils.py:289} INFO - Event: test-orphaned-test-dag-select-8-zs4sulzt Succeeded, annotations: <omitted>
[2024-04-23T15:56:21.404+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-81-9v51k09i
[2024-04-23T15:56:21.496+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-80-ayuh71yt is Running, annotations: <omitted>
[2024-04-23T15:56:21.504+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-82-5u370a8f
[2024-04-23T15:56:21.596+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-81-9v51k09i is Running, annotations: <omitted>
[2024-04-23T15:56:21.701+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-84-j5i87oo2
[2024-04-23T15:56:21.704+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-82-5u370a8f is Running, annotations: <omitted>
[2024-04-23T15:56:21.799+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-86-jgn2e9uu
[2024-04-23T15:56:21.800+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-84-j5i87oo2 is Running, annotations: <omitted>
[2024-04-23T15:56:21.900+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-86-jgn2e9uu is Running, annotations: <omitted>
[2024-04-23T15:56:21.904+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-87-n5lbhsl6
[2024-04-23T15:56:21.999+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-87-n5lbhsl6 is Running, annotations: <omitted>
[2024-04-23T15:56:22.002+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-88-5uvrywjh
[2024-04-23T15:56:22.194+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-89-uo51ntoz
[2024-04-23T15:56:22.199+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-88-5uvrywjh is Running, annotations: <omitted>
[2024-04-23T15:56:22.300+0000] {kubernetes_executor_utils.py:289} INFO - Event: test-orphaned-test-dag-select-89-uo51ntoz Succeeded, annotations: <omitted>
[2024-04-23T15:56:22.383+0000] {kubernetes_executor.py:702} INFO - Attempting to adopt pod test-orphaned-test-dag-select-75-2kdv9lu0
[2024-04-23T15:56:22.397+0000] {kubernetes_executor_utils.py:289} INFO - Event: test-orphaned-test-dag-select-78-90a0rqll Succeeded, annotations: <omitted>
[2024-04-23T15:56:22.501+0000] {kubernetes_executor.py:702} INFO - Attempting to adopt pod test-orphaned-test-dag-select-83-8vjoq1y7
[2024-04-23T15:56:22.600+0000] {kubernetes_executor_utils.py:289} INFO - Event: test-orphaned-test-dag-select-75-2kdv9lu0 Succeeded, annotations: <omitted>
[2024-04-23T15:56:22.697+0000] {kubernetes_executor_utils.py:289} INFO - Event: test-orphaned-test-dag-select-83-8vjoq1y7 Succeeded, annotations: <omitted>
[2024-04-23T15:56:23.319+0000] {scheduler_job_runner.py:1671} INFO - Reset the following 27 orphaned TaskInstances:
    <TaskInstance: test__orphaned_test_dag.select_25 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_26 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_27 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_28 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_29 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_3 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_32 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_56 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_57 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_59 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_6 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_60 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_61 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_62 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_63 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_65 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_70 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_76 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_77 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_79 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_80 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_81 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_82 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_84 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_86 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_87 scheduled__2024-04-23T14:30:00+00:00 [running]>
    <TaskInstance: test__orphaned_test_dag.select_88 scheduled__2024-04-23T14:30:00+00:00 [running]>

error log of test__orphaned_test_dag.select_26 Task instance worker pod:

[2024-04-24, 00:54:14 JST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test__orphaned_test_dag.select_26 scheduled__2024-04-23T14:30:00+00:00 [queued]>
[2024-04-24, 00:54:14 JST] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test__orphaned_test_dag.select_26 scheduled__2024-04-23T14:30:00+00:00 [queued]>
[2024-04-24, 00:54:14 JST] {taskinstance.py:2193} INFO - Starting attempt 1 of 4
[2024-04-24, 00:54:14 JST] {taskinstance.py:2217} INFO - Executing <Task(MyDummyOperator): select_26> on 2024-04-23 14:30:00+00:00
[2024-04-24, 00:54:14 JST] {standard_task_runner.py:60} INFO - Started process 12 to run task
[2024-04-24, 00:54:14 JST] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'test__orphaned_test_dag', 'select_26', 'scheduled__2024-04-23T14:30:00+00:00', '--job-id', '5713', '--raw', '--subdir', 'DAGS_FOLDER/my_dag.py', '--cfg-path', '/tmp/tmp5mif6w64']
[2024-04-24, 00:54:14 JST] {standard_task_runner.py:88} INFO - Job 5713: Subtask select_26
[2024-04-24, 00:54:14 JST] {task_command.py:423} INFO - Running <TaskInstance: test__orphaned_test_dag.select_26 scheduled__2024-04-23T14:30:00+00:00 [running]> on host test-orphaned-test-dag-select-26-v3hdab2d
[2024-04-24, 00:54:14 JST] {logging_mixin.py:188} WARNING - /usr/local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/template_rendering.py:46 AirflowProviderDeprecationWarning: This function is deprecated. Please use `create_unique_id`.
[2024-04-24, 00:54:14 JST] {logging_mixin.py:188} WARNING - /usr/local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:145 AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`.
[2024-04-24, 00:54:14 JST] {pod_generator.py:555} WARNING - Model file /opt/airflow/pod_templates/pod_template_file.yaml does not exist
[2024-04-24, 00:54:14 JST] {taskinstance.py:2513} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='test__orphaned_test_dag' AIRFLOW_CTX_TASK_ID='select_26' AIRFLOW_CTX_EXECUTION_DATE='2024-04-23T14:30:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-23T14:30:00+00:00'
[2024-04-24, 00:54:14 JST] {my_dummy_operator.py:142} INFO - Task started. Sleeping for 5 minutes.
[2024-04-24, 00:57:14 JST] {local_task_job_runner.py:302} WARNING - State of this instance has been externally set to scheduled. Terminating instance.
[2024-04-24, 00:57:14 JST] {process_utils.py:131} INFO - Sending 15 to group 12. PIDs of all processes in the group: [12]
[2024-04-24, 00:57:14 JST] {process_utils.py:86} INFO - Sending the signal 15 to group 12
[2024-04-24, 00:57:14 JST] {taskinstance.py:2483} ERROR - Received SIGTERM. Terminating subprocesses.
[2024-04-24, 00:57:14 JST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/plugins/operators/my_dummy_operator.py", line 143, in execute
    time.sleep(300)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2485, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2024-04-24, 00:57:14 JST] {taskinstance.py:1149} INFO - Marking task as UP_FOR_RETRY. dag_id=test__orphaned_test_dag, task_id=select_26, execution_date=20240423T143000, start_date=20240423T155414, end_date=20240423T155714
[2024-04-24, 00:57:16 JST] {standard_task_runner.py:107} ERROR - Failed to execute job 5713 for task select_26 (Task received SIGTERM signal; 12)
[2024-04-24, 00:57:16 JST] {process_utils.py:79} INFO - Process psutil.Process(pid=12, status='terminated', exitcode=1, started='15:54:14') (12) terminated with exit code 1

Logs on the scheduler containing the pod name test-orphaned-test-dag-select-26-v3hdab2d:

[2024-04-23T15:56:17.392+0000] {kubernetes_executor.py:661} INFO - attempting to adopt pod test-orphaned-test-dag-select-26-v3hdab2d
[2024-04-23T15:56:17.590+0000] {kubernetes_executor_utils.py:304} INFO - Event: test-orphaned-test-dag-select-26-v3hdab2d is Running, annotations: <omitted>
[2024-04-23T15:57:18.590+0000] {kubernetes_executor_utils.py:289} INFO - Event: test-orphaned-test-dag-select-26-v3hdab2d Succeeded, annotations: <omitted>
[2024-04-23T15:57:24.684+0000] {kubernetes_executor.py:401} INFO - Changing state of (TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_26', run_id='scheduled__2024-04-23T14:30:00+00:00', try_number=1, map_index=-1), None, 'test-orphaned-test-dag-select-26-v3hdab2d', 'it-dev', '8903379844') to None
[2024-04-23T15:57:24.889+0000] {kubernetes_executor_utils.py:284} INFO - Skipping event for Succeeded pod test-orphaned-test-dag-select-26-v3hdab2d - event for this pod already sent to executor
[2024-04-23T16:00:09.891+0000] {kubernetes_executor_utils.py:284} INFO - Skipping event for Succeeded pod test-orphaned-test-dag-select-26-v3hdab2d - event for this pod already sent to executor
[2024-04-23T16:00:09.994+0000] {kubernetes_executor_utils.py:284} INFO - Skipping event for Succeeded pod test-orphaned-test-dag-select-26-v3hdab2d - event for this pod already sent to executor
tanvn commented 4 months ago

I think I found the cause of this issue. The logic of adopt_or_reset_orphaned_tasks is called 2 times in a very short time due to the use of run_with_db_retries https://github.com/apache/airflow/blob/2.8.4/airflow/jobs/scheduler_job_runner.py#L1610

When the first attempt tried to update task instances in my MySQL, a MySQLdb.OperationalError error happened, and the second attempt was carried out almost immediately (0.4467290363932428 seconds, as shown in the log)

https://github.com/apache/airflow/blob/2.8.4/airflow/jobs/scheduler_job_runner.py#L1651-L1679

In the second attempt, while the pods have been adopted already so the corresponding task instances are not removed from tis_to_flush_by_key and were added to tis_to_flush due to tis_to_flush.extend(tis_to_flush_by_key.values()) https://github.com/apache/airflow/blob/2.8.4/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py#L553-L575

Here is the logs

[2024-05-03T17:18:59.326+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 1 of 3
...
[2024-05-03T17:19:07.223+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.4467290363932428 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221361), 'test__orphaned_test_dag', 'select_next_17', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221383), 'test__orphaned_test_dag', 'select_next_18', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221391), 'test__orphaned_test_dag', 'select_next_20', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221398), 'test__orphaned_test_dag', 'select_next_22', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221405), 'test__orphaned_test_dag', 'select_next_23', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221411), 'test__orphaned_test_dag', 'select_next_24', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221418), 'test__orphaned_test_dag', 'select_next_28', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221424), 'test__orphaned_test_dag', 'select_next_29', 'scheduled__2024-04-24T02:30:00+00:00', -1)  ... displaying 10 of 31 total bound parameter sets ...  (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221560), 'test__orphaned_test_dag', 'select_next_75', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221566), 'test__orphaned_test_dag', 'select_next_76', 'scheduled__2024-04-24T02:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks
    session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
    c = connection._execute_20(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221361), 'test__orphaned_test_dag', 'select_next_17', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221383), 'test__orphaned_test_dag', 'select_next_18', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221391), 'test__orphaned_test_dag', 'select_next_20', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221398), 'test__orphaned_test_dag', 'select_next_22', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221405), 'test__orphaned_test_dag', 'select_next_23', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221411), 'test__orphaned_test_dag', 'select_next_24', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221418), 'test__orphaned_test_dag', 'select_next_28', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221424), 'test__orphaned_test_dag', 'select_next_29', 'scheduled__2024-04-24T02:30:00+00:00', -1)  ... displaying 10 of 31 total bound parameter sets ...  (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221560), 'test__orphaned_test_dag', 'select_next_75', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221566), 'test__orphaned_test_dag', 'select_next_76', 'scheduled__2024-04-24T02:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-05-03T17:19:07.727+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
...
tanvn commented 4 months ago

@RNHTTR Hi, I created a PR to fix this issue: https://github.com/apache/airflow/pull/39406 PTAL at your convenience.