apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.4k stars 14.11k forks source link

Tasks intermittently gets terminated with SIGTERM on kubernetes executor #18041

Closed Nimesh-K-Makwana closed 2 years ago

Nimesh-K-Makwana commented 3 years ago

Apache Airflow version

2.1.3 (latest released)

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

Have tried env variables as given in this github issue issues/14672: AIRFLOWCOREKILLED_TASK_CLEANUP_TIME: "604800" AIRFLOWSCHEDULERSCHEDULE_AFTER_TASK_EXECUTION: "False"

What happened

[2021-09-04 10:28:50,536] {local_task_job.py:80} ERROR - Received SIGTERM. Terminating subprocesses [2021-09-04 10:28:50,536] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 33 [2021-09-04 10:28:50,537] {taskinstance.py:1235} ERROR - Received SIGTERM. Terminating subprocesses. [2021-09-04 10:28:52,568] {taskinstance.py:1462} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task self._prepare_and_execute_task_with_callbacks(context, task) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks result = self._execute_task(context, task_copy) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1307, in _execute_task result = task_copy.execute(context=context) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute return_value = self.execute_callable() File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/opt/airflow/dags/repo/dags/elastit_schedular/waiting_task_processor.py", line 59, in trigger_task time.sleep(1) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1237, in signal_handler raise AirflowException("Task received SIGTERM signal") airflow.exceptions.AirflowException: Task received SIGTERM signal

What you expected to happen

Dag must get executed successfully without any sigterm signal.

How to reproduce

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

ephraimbuddy commented 3 years ago

Please provide the full task logs and also the scheduler logs. What you added above is only the part where the task failed but I believe something must have caused it to receive the sigterm

Nimesh-K-Makwana commented 3 years ago

Seems to be failing while waiting or when they are time-consuming (long-running). Providing the latest logs for a failed task due to sigterm :

Task logs :

[2021-09-06 07:20:33,448] {taskinstance.py:1107} INFO - Executing <Task(PythonOperator): Wait_for_creation> on 2021-09-06T07:10:00+00:00
[2021-09-06 07:20:33,451] {standard_task_runner.py:52} INFO - Started process 42 to run task
[2021-09-06 07:20:33,453] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'backup_lms_rolling', 'Wait_for_creation', '2021-09-06T07:10:00+00:00', '--job-id', '92518', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/backup/backup_lms_rolling.py', '--cfg-path', '/tmp/tmpmtkv677x', '--error-file', '/tmp/tmp3ha9ijqx']
[2021-09-06 07:20:33,454] {standard_task_runner.py:77} INFO - Job 92518: Subtask Wait_for_creation
[2021-09-06 07:20:33,598] {logging_mixin.py:104} INFO - Running <TaskInstance: backup_lms_rolling.Wait_for_creation 2021-09-06T07:10:00+00:00 [running]> on host backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08
[2021-09-06 07:20:33,858] {taskinstance.py:1300} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Incred
AIRFLOW_CTX_DAG_ID=backup_lms_rolling
AIRFLOW_CTX_TASK_ID=Wait_for_creation
AIRFLOW_CTX_EXECUTION_DATE=2021-09-06T07:10:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-09-06T07:10:00+00:00
[2021-09-06 07:20:33,872] {credentials.py:1100} INFO - Found credentials in environment variables.
[2021-09-06 07:24:00,469] {local_task_job.py:76} ERROR - Received SIGTERM. Terminating subprocesses
[2021-09-06 07:24:00,470] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 42
[2021-09-06 07:24:00,470] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-09-06 07:24:00,555] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/repo/dags/backup/backup_lms_rolling.py", line 103, in wait_for_creation
    time.sleep(30)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-09-06 07:24:00,556] {taskinstance.py:1544} INFO - Marking task as UP_FOR_RETRY. dag_id=backup_lms_rolling, task_id=Wait_for_creation, execution_date=20210906T071000, start_date=20210906T072033, end_date=20210906T072400
[2021-09-06 07:24:00,682] {process_utils.py:66} INFO - Process psutil.Process(pid=42, status='terminated', exitcode=1, started='07:20:32') (42) terminated with exit code 1

Scheduler logs:

Sep 6, 2021 @ 12:50:20.831 | [[34m2021-09-06 07:20:20,831[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type ADDED[0m
Sep 6, 2021 @ 12:50:20.831 | [[34m2021-09-06 07:20:20,831[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:20.844 | [[34m2021-09-06 07:20:20,844[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:20.844 | [[34m2021-09-06 07:20:20,844[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:20.983 | [[34m2021-09-06 07:20:20,983[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:20.983 | [[34m2021-09-06 07:20:20,983[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:24.181 | [[34m2021-09-06 07:20:24,181[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:24.181 | [[34m2021-09-06 07:20:24,181[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:28.163 | [[34m2021-09-06 07:20:28,163[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:28.163 | [[34m2021-09-06 07:20:28,163[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:29.242 | [[34m2021-09-06 07:20:29,242[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:29.242 | [[34m2021-09-06 07:20:29,242[0m] {[34mkubernetes_executor.py:[0m208} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e is Running[0m
Sep 6, 2021 @ 12:54:00.461 | [[34m2021-09-06 07:24:00,461[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:54:00.461 | [[34m2021-09-06 07:24:00,461[0m] {[34mkubernetes_executor.py:[0m208} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e is Running[0m
Sep 6, 2021 @ 12:54:03.089 | [[34m2021-09-06 07:24:03,089[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:54:03.089 | [[34m2021-09-06 07:24:03,089[0m] {[34mkubernetes_executor.py:[0m202} ERROR[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Failed[0m
Sep 6, 2021 @ 12:54:04.217 | [[34m2021-09-06 07:24:04,217[0m] {[34mkubernetes_executor.py:[0m368} INFO[0m - Attempting to finish pod; pod_id: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e; state: failed; annotations: {'dag_id': 'backup_lms_rolling', 'task_id': 'Wait_for_creation', 'execution_date': '2021-09-06T07:10:00+00:00', 'try_number': '1'}[0m
Sep 6, 2021 @ 12:54:04.218 | [[34m2021-09-06 07:24:04,218[0m] {[34mkubernetes_executor.py:[0m546} INFO[0m - Changing state of (TaskInstanceKey(dag_id='backup_lms_rolling', task_id='Wait_for_creation', execution_date=datetime.datetime(2021, 9, 6, 7, 10, tzinfo=tzlocal()), try_number=1), 'failed', 'backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e', 'airflow-2x', '107854273') to failed[0m
Sep 6, 2021 @ 12:54:10.689 | [[34m2021-09-06 07:24:10,688[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:54:10.689 | [[34m2021-09-06 07:24:10,689[0m] {[34mkubernetes_executor.py:[0m202} ERROR[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Failed[0m
Sep 6, 2021 @ 12:54:10.744 | [[34m2021-09-06 07:24:10,743[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type DELETED[0m
Sep 6, 2021 @ 12:54:10.744 | [[34m2021-09-06 07:24:10,744[0m] {[34mkubernetes_executor.py:[0m202} ERROR[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Failed[0m
Sep 6, 2021 @ 12:54:11.091 | [[34m2021-09-06 07:24:11,091[0m] {[34mkubernetes_executor.py:[0m368} INFO[0m - Attempting to finish pod; pod_id: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e; state: failed; annotations: {'dag_id': 'backup_lms_rolling', 'task_id': 'Wait_for_creation', 'execution_date': '2021-09-06T07:10:00+00:00', 'try_number': '1'}[0m
Sep 6, 2021 @ 12:54:11.091 | [[34m2021-09-06 07:24:11,091[0m] {[34mkubernetes_executor.py:[0m368} INFO[0m - Attempting to finish pod; pod_id: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e; state: failed; annotations: {'dag_id': 'backup_lms_rolling', 'task_id': 'Wait_for_creation', 'execution_date': '2021-09-06T07:10:00+00:00', 'try_number': '1'}[0m
Sep 6, 2021 @ 12:54:11.092 | [[34m2021-09-06 07:24:11,092[0m] {[34mkubernetes_executor.py:[0m546} INFO[0m - Changing state of (TaskInstanceKey(dag_id='backup_lms_rolling', task_id='Wait_for_creation', execution_date=datetime.datetime(2021, 9, 6, 7, 10, tzinfo=tzlocal()), try_number=1), 'failed', 'backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e', 'airflow-2x', '107854343') to failed[0m
Sep 6, 2021 @ 12:54:11.093 | [[34m2021-09-06 07:24:11,092[0m] {[34mkubernetes_executor.py:[0m546} INFO[0m - Changing state of (TaskInstanceKey(dag_id='backup_lms_rolling', task_id='Wait_for_creation', execution_date=datetime.datetime(2021, 9, 6, 7, 10, tzinfo=tzlocal()), try_number=1), 'failed', 'backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e', 'airflow-2x', '107854344') to failed[0m
ephraimbuddy commented 3 years ago

For the task to receive sigterm means something is killing your pods. Task runner receives SIGTERM when Pod is deleted. Can you check if something else is deleting your pods?

Nimesh-K-Makwana commented 3 years ago

True, when a pod gets deleted they are getting SIGTERM. I have tried to get the cause of pods getting deleted, but could not find any reason for it yet. It happens just randomly.

easontm commented 3 years ago

Is your DAG paused?

Nimesh-K-Makwana commented 3 years ago

DAG is in ON state.

ephraimbuddy commented 3 years ago

@Nimesh-K-Makwana can you set delete_worker_pods to false and observe this issue, also describe the pod to see if everything is correct

laserpedro commented 3 years ago

Hello,

I am facing the same issue: airflow 2.1.3 (tested also with 2.1.2, 2.1.1) backend: postgresql executor: LocalExecutor unixname: airflow task default user (run_as_user) = airflow

I have modified the variables killed_task_cleanup_time and schedule_after_task_execution to resp. 100000 and False. I have also installed airflow as non root user and set the default run_as_user to be airflow. I have tried to remove the task instances to start from scratch for tasks and also dag runs (no rows in the db).

My tasks are getting constantly killed in backfill mode with the traceback:

[2021-09-07 10:21:20,185] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 73011

Honestly, I am a bit discouraged at this point, could you help me please ? tks

tasks logs:

Reading local file: /home/airflow/airflow/logs/import_forex_unwind_trades/detect_book_referential/2021-08-05T00:00:00+00:00/28.log
[2021-09-07 11:18:19,826] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: import_forex_unwind_trades.detect_book_referential 2021-08-05T00:00:00+00:00 [queued]>
[2021-09-07 11:18:21,644] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: import_forex_unwind_trades.detect_book_referential 2021-08-05T00:00:00+00:00 [queued]>
[2021-09-07 11:18:21,644] {taskinstance.py:1087} INFO - 
--------------------------------------------------------------------------------
[2021-09-07 11:18:21,644] {taskinstance.py:1088} INFO - Starting attempt 28 of 28
[2021-09-07 11:18:21,644] {taskinstance.py:1089} INFO - 
--------------------------------------------------------------------------------
[2021-09-07 11:18:21,661] {taskinstance.py:1107} INFO - Executing <Task(HttpSensor): detect_book_referential> on 2021-08-05T00:00:00+00:00
[2021-09-07 11:18:21,662] {base_task_runner.py:133} INFO - Running on host: gvasrv-airflow
[2021-09-07 11:18:21,662] {base_task_runner.py:134} INFO - Running: ['airflow', 'tasks', 'run', 'import_forex_unwind_trades', 'detect_book_referential', '2021-08-05T00:00:00+00:00', '--job-id', '723490', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/import_fx_unwind_trades.py', '--cfg-path', '/tmp/tmpe43x01zl', '--error-file', '/tmp/tmpohmu9qb7']
[2021-09-07 11:18:24,493] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential [2021-09-07 11:18:24,492] {dagbag.py:496} INFO - Filling up the DagBag from /home/airflow/airflow/dags/import_fx_unwind_trades.py
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential /home/airflow/anaconda3/envs/airflow_dev_py37/lib/python3.7/site-packages/airflow/providers/http/sensors/http.py:26 DeprecationWarning: This decorator is deprecated.
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential 
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential In previous versions, all subclasses of BaseOperator must use apply_default decorator for the`default_args` feature to work properly.
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential 
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential In current version, it is optional. The decorator is applied automatically using the metaclass.
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential 
[2021-09-07 11:18:32,281] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential [2021-09-07 11:18:32,280] {base.py:78} INFO - Using connection to: id: alpflow_symph_conn. Host: https://as.symphony.com/integration/v1/whi/simpleWebHookIntegration/, Port: None, Schema: None, Login: None, Password: None, extra: {'webhook_token': }
[2021-09-07 11:18:33,857] {local_task_job.py:194} WARNING - Recorded pid 60749 does not match the current pid 98677
[2021-09-07 11:18:33,863] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 98677
[2021-09-07 11:18:33,870] {process_utils.py:66} INFO - Process psutil.Process(pid=98677, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='11:18:21') (98677) terminated with exit code Negsignal.SIGTERM

scheduler log:

[2021-09-07 11:33:13,489] {dagrun.py:429} ERROR - Marking run <DagRun import_forex_unwind_trades @ 2021-08-11 00:00:00+00:00: backfill__2021-08-11T00:00:00+00:00, externally triggered: False> failed [2021-09-07 11:33:13,527] {backfill_job.py:388} INFO - [backfill progress] | finished run 7 of 16 | tasks waiting: 0 | succeeded: 0 | running: 9 | failed: 39 | skipped: 0 | deadlocked: 0 | not ready: 0 [2021-09-07 11:33:14,819] {local_executor.py:124} ERROR - Failed to execute task PID of job runner does not match.

I alos face the issue of 17507

Tks,

Pierre

laserpedro commented 3 years ago

Can be related to : https://github.com/apache/airflow/commit/ed99eaafc479aedbbe2d618da878195a132abb1a

laserpedro commented 3 years ago

So I tested the code that generated the issue described above with the change in the commit and it did not solve the issue: the first backfill worked but all the following did not: still getting the same error.

ephraimbuddy commented 3 years ago

@laserpedro , I'm not able to reproduce your case with backfill. How long is your task running? I will also appreciate if you would make a dag to reproduce this. Again, It looks to me that you didn't set up airflow properly to use run_as_user. Can you take a look at documentation: ,http://airflow.apache.org/docs/apache-airflow/stable/security/workload.html#impersonation

laserpedro commented 3 years ago

Hello @ephraimbuddy,

Thank you for you answer !

So I thought the same as you so I did a full re install today (new machine, new database, new environment, mew user). I have a airflow user that has sudo privileges than runs the scheduler and the webserver.

run_as_user is left empty in the airflow.cfg file now.

First question: in terms of user does that seem correct to you ?

I ported one dags that generated those errors on backfill mode and I am still getting them in an erratic way: not the same to fail, whatever the number of dags launched in backfill ....(30, 16, 10).

I would say that for the classic backfill (16 dag runs in parallel for me) the time to finish them all is 7-10 minutes: not any big stuff is done but the insertion of 40k lines in a postgresql db at the end ... (so kind of small actually). However it is true that it seems that the time variable seems to be a factor here since when backfill is performed with very few tasks it is working.

For the code sample I will provide you sth asap to reproduce the pb (sent by mail :))

Tks,

Pierre

ephraimbuddy commented 3 years ago

@laserpedro Yes. that makes sense for a user setup.

ephraimbuddy commented 3 years ago

@laserpedro It will be very helpful if you can provide a simple dag to reproduce this behaviour.

laserpedro commented 3 years ago

Hello @ephraimbuddy,

Since I had to focus on solving the issue of my airflow session I made the below modifications and it seems to be properly working now:

With this new set up my airflow session has been working correctly working for 2 days now.

ephraimbuddy commented 3 years ago

Thanks @laserpedro. We want to fix this and would appreciate it if you can make a simple dag to reproduce it

felipeangelimvieira commented 3 years ago

Hello @ephraimbuddy,

We are facing the same issue... I'm using the official airflow helm chart in a Azure Kubernetes Service, with LocalExecutor. Different values for AIRFLOWSCHEDULERSCHEDULE_AFTER_TASK_EXECUTION give different error messages...

Here it is an example dag to reproduce it:

from airflow import DAG
from airflow.operators.python import PythonOperator

import time
from datetime import datetime

dag = DAG('dag_test',
          description='test',
          schedule_interval=None,
          start_date=datetime(2021, 4, 1),
          max_active_runs=1,
          concurrency=40,
          catchup=False)

def my_sleeping_function(t):
    time.sleep(t)

tasks = []
for i in range(400):
    task = PythonOperator(task_id='sleep_for_' + str(i),
                          python_callable=my_sleeping_function,
                          op_kwargs={'t': 60},
                          dag=dag)

With AIRFLOWSCHEDULERSCHEDULE_AFTER_TASK_EXECUTION=true

*** Reading local file: /opt/airflow/logs/airflow2/dag_test/sleep_for_0/2021-09-15T11:52:24.627715+00:00/1.log
[2021-09-15 11:54:59,531] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:54:59,583] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:54:59,583] {taskinstance.py:1087} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:54:59,583] {taskinstance.py:1088} INFO - Starting attempt 1 of 1
[2021-09-15 11:54:59,583] {taskinstance.py:1089} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:55:04,450] {taskinstance.py:1107} INFO - Executing <Task(PythonOperator): sleep_for_0> on 2021-09-15T11:52:24.627715+00:00
[2021-09-15 11:55:04,454] {standard_task_runner.py:52} INFO - Started process 1657 to run task
[2021-09-15 11:55:04,458] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'dag_test', 'sleep_for_0', '2021-09-15T11:52:24.627715+00:00', '--job-id', '33893', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/other/dag_test_bug.py', '--cfg-path', '/tmp/tmpcwh5nv0m', '--error-file', '/tmp/tmp8t0t_nvu']
[2021-09-15 11:55:04,459] {standard_task_runner.py:77} INFO - Job 33893: Subtask sleep_for_0
[2021-09-15 11:55:29,043] {logging_mixin.py:104} INFO - Running <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [running]> on host airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local
[2021-09-15 11:56:02,766] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dag_test
AIRFLOW_CTX_TASK_ID=sleep_for_0
AIRFLOW_CTX_EXECUTION_DATE=2021-09-15T11:52:24.627715+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-15T11:52:24.627715+00:00
[2021-09-15 11:57:00,870] {local_task_job.py:77} ERROR - Received SIGTERM. Terminating subprocesses
[2021-09-15 11:57:00,916] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 1657
[2021-09-15 11:57:00,916] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses.

With AIRFLOWSCHEDULERSCHEDULE_AFTER_TASK_EXECUTION=false

EDIT: it seems that the error message below appears when I relaunch the task after its failure. It may not be related to SCHEDULE_AFTER_TASK_EXECUTION config

*** Reading local file: /opt/airflow/logs/airflow2/dag_test/sleep_for_0/2021-09-15T11:52:24.627715+00:00/2.log
[2021-09-15 11:59:52,836] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:59:57,228] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:59:57,228] {taskinstance.py:1087} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:59:57,228] {taskinstance.py:1088} INFO - Starting attempt 2 of 2
[2021-09-15 11:59:57,228] {taskinstance.py:1089} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:59:57,253] {taskinstance.py:1107} INFO - Executing <Task(PythonOperator): sleep_for_0> on 2021-09-15T11:52:24.627715+00:00
[2021-09-15 11:59:57,256] {standard_task_runner.py:52} INFO - Started process 814 to run task
[2021-09-15 11:59:57,258] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'dag_test', 'sleep_for_0', '2021-09-15T11:52:24.627715+00:00', '--job-id', '33947', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/other/dag_test_bug.py', '--cfg-path', '/tmp/tmpe7l0bnkn', '--error-file', '/tmp/tmppppc4ghm']
[2021-09-15 11:59:57,260] {standard_task_runner.py:77} INFO - Job 33947: Subtask sleep_for_0
[2021-09-15 12:00:18,910] {logging_mixin.py:104} INFO - Running <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [running]> on host airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local
[2021-09-15 12:00:27,276] {local_task_job.py:194} WARNING - Recorded pid 1657 does not match the current pid 814
[2021-09-15 12:00:27,283] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 814
[2021-09-15 12:00:27,303] {process_utils.py:66} INFO - Process psutil.Process(pid=814, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='11:59:57') (814) terminated with exit code Negsignal.SIGTERM

Some tasks succeed and others are randomly killed.

ephraimbuddy commented 3 years ago

Thanks, @felipeangelimvieira for coming through with the dag!

felipeangelimvieira commented 3 years ago

So I found out that my metadata database was using 100% cpu while running dags with multiple tasks in parallel, such as the example above. I'm using Azure PostgresSQL and the official airflow helm chart with pgbouncer enabled.

Increasing the database may solve the issue, but instead I increased the AIRFLOWSCHEDULERJOB_HEARTBEAT_SEC and the problem was solved (the default is 5 sec).

While I'm not sure, it's possible that the heatbeat method of BaseJob is the one overloading the database. When the database is running out of CPU, the default heartbeat takes longer than heartrate * 2.1 (2.1 is the default grace_multiplier in is_alive method of BaseJob) and the scheduler kills the tasks.

laserpedro commented 3 years ago

@felipeangelimvieira have you noticed some pattern? Like the error happens if your task exec time > heart beat rate ? What time in sec have you put in your config ? Sth >> task average exec time ? Tks!

felipeangelimvieira commented 3 years ago

@laserpedro unfortunately no patterns... it worked with the example dag once. I have no idea how to solve it, altough the database running out of CPU seems to play a role. Could you verify if it is also the case for your database?

I've tried different configurations (LocalExecutor, CeleryExecutor), and the problem keeps appearing randomly with those dags with many tasks in parallel.

morhook commented 3 years ago

Same thing for me. I suspect my database might be getting slower also.

El El dom, sep. 19, 2021 a las 12:17, Felipe Angelim < @.***> escribió:

@laserpedro https://github.com/laserpedro unfortunately no patterns... it worked with the example dag once. I have no idea how to solve it, altough the database running out of CPU seems to play a role. Could you verify if it is also the case for your database?

I've tried different configurations (LocalExecutor, CeleryExecutor), and the problem keeps appearing randomly with those dags with many tasks in parallel.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/apache/airflow/issues/18041#issuecomment-922490221, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAJUVYTQ4DRZ7PCKUSSEME3UCX5IDANCNFSM5DPYGX6A .

denysivanov commented 3 years ago

I see this issue and it is totally correlates which CPU spikes on my PostgreSQL DB.

felipeangelimvieira commented 3 years ago

Just updated to 2.1.4 and the error message has changed. I'm seeing less dags receiving SIGTERM signal, although this error still appears. Now some tasks are being randomly killed and marked as failed before execution.
The logs of the tasks that have failed before execution follow this pattern:

*** Log file does not exist: /opt/airflow/logs/airflow2/dag_test/sleep_for_15/2021-09-20T17:02:17.954197+00:00/1.log
*** Fetching from: http://:8793/log/dag_test/sleep_for_15/2021-09-20T17:02:17.954197+00:00/1.log
*** Failed to fetch log file from worker. Request URL missing either an 'http://' or 'https://' protocol.

I've also verified some succeeded tasks marked as failed.

In addition, I was able to reproduce the SIGTERM issue localy with docker-compose by limiting the CPU usage of the PostgresSQL container. Indeed, the meta database may cause the SIGTERM error. However, I wasn't expecting high CPU usage in PostgreSQL database while running with the official helm chart since it has a Pgbouncer.

I can share the code of the docker-compose if you find it helpful.

laserpedro commented 2 years ago

The SIGTERM issue came back for me in 2.0.2 so yes I really think that it is backend related. I will have to log the cpu usage to see what's going on when all my dags are triggered at 00:00:00.

From that, I don't know what could be the solution... a fine tuning on the postgresql server maybe ... I am currently using the standard postgre cfg.

@felipeangelimvieira : interesting stuff found on stackoverflow: https://stackoverflow.com/questions/42419834/airbnb-airflow-using-all-system-resources

stephenonethree commented 2 years ago

+1, had this happen to a lot of subclassed BaseSensorOperators in 2.1.2 with errors like the below. I was running a lot of them in parallel when it happened - possibly more than my system could handle, though I had been hoping for a more graceful handling of the situation.

[2021-09-28 20:38:07,279] {local_task_job.py:194} WARNING - Recorded pid 19114 does not match the current pid 14691
[2021-09-28 20:38:07,285] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 14691
[2021-09-28 20:38:07,348] {process_utils.py:66} INFO - Process psutil.Process(pid=14691, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='20:37:57') (14691) terminated with exit code Negsignal.SIGTERM
laserpedro commented 2 years ago

@stephenonethree what is you timeout on those BaseSensors ? What is your heart beat scheduler interval ? Are you running on localExecutor ?

stephenonethree commented 2 years ago

@laserpedro here are some details:

timeout=518400
mode='reschedule'
poke_interval=30

I am running on Cloud Composer version composer-1.17.1-airflow-2.1.2, so the executor is CeleryExecutor. In the monitoring page for the hardware things look good, in the Kubernetes cluster one node is at 95% CPU and the others are at 50%. I have one scheduler. The database resource usage is low.

stephenonethree commented 2 years ago

Good news, I have created simple reproducer code, which I have tested quite thoroughly. This code reproduces the bug 100% of the time. It's a very simple reproducer, which hopefully will help with fixing it. I would be very grateful if somebody could look at this.

GENERAL NOTES:

  1. The DAG uses a custom sensor operator which pulls XCOM and Airflow variables several times each per poke. I am not sure, but I think the bug may be related to instability caused by network traffic from commands such as these (or any usage of the network).
  2. The DAG runs 45 sensors in parallel indefinitely. I don't think it is a sensor-specific issue because I have seen the same errors with ordinary tasks, though less frequently. Also, using sensors helps test for this bug because they repeat indefinitely until they error. (I am not sure if the bug requires so many sensors, but I do have a job with 45 sensors, which is where I encountered this bug.)
  3. Things sometimes start OK for the first few minutes, but you should always see the bug within 15 minutes, usually less. I saw it take 14 minutes once, right after the scheduler was restarted (maybe the scheduler is more reliable after it has been restarted).
  4. In my tests I sometimes saw tasks making unexpected status transitions in the UI, for example moving from "scheduled" to "no_status". This might be related.

ERROR MESSAGES (there are two different types of error)

  1. When poke_interval is 30, you will get the "Recorded pid X does not match the current pid Y" error in the logs. If you turn on error emails, they will say "Detected as zombie" and sometimes you'll get multiple such emails for a single sensor.
  2. If you drop poke_interval to 5, you will still get the previous error, but sometimes instead the task will error without an error message, and the error email will say, "Exception: Executor reports task instance finished (success) although the task says its queued. (Info: None) Was the task killed externally?" Sometimes instead of "(success)" the email will say "(failed)"

STEPS TO SETUP TEST ENVIRONMENT

So far I have only tested this on Cloud Composer, with the following configuration:

  1. Create an environment with version composer-1.17.1-airflow-2.1.2. This is their latest version.
  2. No environment variable overrides
  3. 3 worker nodes, n1-standard-2, 100GB disk
  4. Webserver machine type: composer-n1-webserver-2 (default)
  5. Cloud SQL machine type: db-n1-standard-2 (default)
  6. For Airflow configuration, my only overrides are hopefully unrelated (most of the smtp variables, email.email_backend, secrets.backend, webserver.navbar_color, webserver.dag_default_view).
  7. Increase the number of schedulers to 2 (This may not be required, but I only tested with 2 schedulers.)
  8. Create an Airflow variable in JSON format named "env_vars." This is just for the sake of the test.

I made other changes to things that I don't think are related, for example I use a custom GCP service account. I can share further details if you can't reproduce yourself.

My code is attached to this comment.

That's it! I really hope this helps get to the bottom of this!

github_issue_18041.zip

stephenonethree commented 2 years ago

I discovered that this issue may be related to https://github.com/apache/airflow/issues/17507

laserpedro commented 2 years ago

In my case I have some pattern on failures:

Case 1: a Task in the dag takes some time to finish (because it is doing some computations or inserting a large amount of data in a db) and the execution time is >= heartbeat signal. After having incorporated this patch that was supposed to fix this issue I was still getting this error. The CPU usage was low both on the scheduler and on postgres, therefore not resource related ... After checking I found this on stackoveflow and adjusted my config so that now:

scheduler_heartbeat_sec = 200

scheduler_health_check_threshold = 600

I have relaunched the dags that were long to process (by long I mean exec time > heartbeat interval) and for the moment I have not received any SIGTERM signal.

Case 2: a inherited class of BaseOperator was hammering the scheduler by using a poke_interval < 1 min whereas it is not recommended at all by the official documentation when used in poke mode.

By fixing the interval on the sensors and modifiying the config and incorporating the fix I finally seem to have somehting that looks stable using airflow > 2.0.0.

I wish I could give a more technical solution on this ...

benvit92 commented 2 years ago

Hello all,

not sure if it is helpful but I can tell the following:

we had the same issue with Airflow 2.1.0 deployed on AKS (Azure k8s) and we also noticed that a lot of pods were not being cleaned after completion (Success or Error or even CrashLoopBackOff without loading the correct pod template) and while scraping the scheduler logs for sigtermed tasks I noticed the following entries:

{"timestamp": "2021-10-18T10:11:11.363282Z", "level": "INFO", "name": "airflow.executors.kubernetes_executor.KubernetesExecutor", "message": "Failed to adopt pod cbtbpartyczpartybctranslation.fa5c434191a048cdb9ad9aa747e0f3e9. Reason: (403)\nReason: Forbidden\nHTTP response headers: HTTPHeaderDict({'Audit-Id': '427c0455-2a35-4eec-bd15-b2a2e1d82639', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 18 Oct 2021 10:11:11 GMT', 'Content-Length': '421'})\nHTTP response body: {\"kind\":\"Status\",\"apiVersion\":\"v1\",\"metadata\":{},\"status\":\"Failure\",\"message\":\"pods \\\"cbtbpartyczpartybctranslation.fa5c434191a048cdb9ad9aa747e0f3e9\\\" is forbidden: User \\\"system:serviceaccount:cbdp:airflow-cbdp\\\" cannot patch resource \\\"pods\\\" in API group \\\"\\\" in the namespace \\\"cbdp\\\"\",\"reason\":\"Forbidden\",\"details\":{\"name\":\"cbtbpartyczpartybctranslation.fa5c434191a048cdb9ad9aa747e0f3e9\",\"kind\":\"pods\"},\"code\":403}\n\n"}

As the scheduler was not able to adopt the pods it was sending SIGTERM to pods that were actually still running but probably now were orphaned. After chasing a lot of loose ends the way we were able to fix it was by adding the "patch" permission to the RBAC role for pods and make the ImgPullSecret option optional in the YAML definition we have for Airflow in the helm chart and cleaning up old pending pods.

After this change we are noticing a more stable behavior where a SIGTERM has not been raised yet (and hopefully it won't :) ) and no failure messages on adopting pods as well so far.

Hope this helps someone, if not feel free to discard it. We will keep monitoring the behavior and if this seems to be the permanent fix will consider committing it back in the helm chart of Airflow.

jburgueno commented 2 years ago

We were facing this same issue in our airflow deployed in azure k8s and using azure posgresql as backend. We solved it by enabling pgbouncer in the values.yml, the only catch was that to enable pgbouncer with azure postgresql as backend yo need to change this line in https://github.com/apache/airflow/blob/main/chart/templates/_helpers.yaml auth_type = md5 to auth_type = scram-sha-256

This since at least our postgresql instance in azure was configured to not accept md5 encrypted passwords. Hope this help someone.

Reference: https://github.com/pgbouncer/pgbouncer/issues/325

luzbarbosa commented 2 years ago

@jburgueno, can you please provide at least part of your values.yml configuration file ?

We are facing the same issue here, even with pgbouncer enabled. I've tried to change auth_type to scram-sha-256 but it didn't solve the problem.

Thanks in advance

LeoDoldan7 commented 2 years ago

Our DAGs have been failing with the same errors, plus all our tasks being Detected as zombie.

We just checked the CPU usage of our database, and it's always at 100% during those shortages.

We are gonna increase AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC, but we would really like to understand the root cause of the issue

LeoDoldan7 commented 2 years ago

Okay, we solved it.

  1. The CPU usage on the database was at 100%.
  2. We changed the variable AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC to 20 sec.
  3. The CPU usage decreased to 50%.
  4. We updated Airflow to 2.1.4.
  5. It seems like the bug got fixed, because the CPU usage decreased to 3%.
  6. All our tasks started dying by being marked as zombies. We don't understand why.
  7. We got AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC back to 5 seconds.
  8. Everything is working perfectly, and we are happily keeping our jobs 🙃

We would really like to understand why AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC at 20 seconds broke everything.

We have AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD set as 1800.

Our train of thought is: running a heartbeat every 20 seconds, during the zombie threshold there should be about 90 heartbeats. And limit_dttm is half an hour earlier. So the only reasonable explanation (to us) is that none of the heartbeats update LJ.latest_heartbeat. (I'm referring to the _find_zombies method of the manager.py file)

alete89 commented 2 years ago

To illustrate what @LeandroDoldan just said, here it is the postgres CPU chart during those changes: image

laserpedro commented 2 years ago

Hello @LeandroDoldan and what was the scheduler_health_check_threshold on your two different configs (resp 20 sec / 5 sec) ?

luzbarbosa commented 2 years ago

@LeandroDoldan and @alete89, would you mind providing a better description of your config file ? Are you guys using the official Airflow Helm Chart ?

The following DAG is a easy way to reproduce the error, would you mind running it just to check if the bugs also disappear in this setting ?

from airflow import DAG
from airflow.operators.python import PythonOperator

import time
from datetime import datetime

dag = DAG('dag_test',
          description='test',
          schedule_interval=None,
          start_date=datetime(2021, 4, 1),
          max_active_runs=1,
          concurrency=40,
          catchup=False)

def my_sleeping_function(t):
    time.sleep(t)

tasks = []
for i in range(400):
    task = PythonOperator(task_id='sleep_for_' + str(i),
                          python_callable=my_sleeping_function,
                          op_kwargs={'t': 60},
                          dag=dag)
LeoDoldan7 commented 2 years ago

Hello @LeandroDoldan and what was the scheduler_health_check_threshold on your two different configs (resp 20 sec / 5 sec) ?

@laserpedro On both cases it was the default: 30 seconds IIRC.

@LeandroDoldan and @alete89, would you mind providing a better description of your config file ? Are you guys using the official Airflow Helm Chart ?

The following DAG is a easy way to reproduce the error, would you mind running it just to check if the bugs also disappear in this setting ?

We will!

morhook commented 2 years ago

On our cluster we suspect it is related with running out of disk space on the "node" level of Kubernetes. https://kubernetes.io/docs/concepts/scheduling-eviction/node-pressure-eviction/

As KubernetesExecutor is creating raw Pods (instead of Deployments or Jobs), this type of "destroys" (evictions) is producing that DAG/Tasks are mark as failed on airflow side and Kubernetes is not re-trying them.

Sidenote: we are using airflow 1.10.7, but seems the problem is the same on 2.x versions of airflow.

luzbarbosa commented 2 years ago

@LeandroDoldan . Still facing the same issue here. Did you guys manage to run the test DAG ? Would you mind providing a more detailed description of your config file ?

eduardchai commented 2 years ago

We started having this issue after we upgraded to v2.2.3. We did not experience this issue when we were at v2.0.2.

Here is the sample dag that we used:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)

with DAG(
    dag_id="stress-test-kubepodoperator",
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2021, 1, 1),
) as dag:
    for i in range(1000):
        KubernetesPodOperator(
            name="airflow-test-pod",
            namespace="airflow-official",
            image="ubuntu:latest",
            cmds=["bash", "-cx"],
            arguments=["r=$(( ( RANDOM % 120 )  + 1 )); sleep ${r}s"],
            labels={"foo": "bar"},
            task_id="task_" + str(i),
            is_delete_operator_pod=True,
            startup_timeout_seconds=300,
            service_account_name="airflow-worker",
            get_logs=True,
            resources={"request_memory": "128Mi", "request_cpu": "100m"},
            queue="kubernetes",
        )

Error message:

[2022-01-17, 18:41:48 +08] {local_task_job.py:212} WARNING - State of this instance has been externally set to scheduled. Terminating instance.
[2022-01-17, 18:41:48 +08] {process_utils.py:124} INFO - Sending Signals.SIGTERM to group 16. PIDs of all processes in the group: [16]
[2022-01-17, 18:41:48 +08] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 16
[2022-01-17, 18:41:48 +08] {taskinstance.py:1408} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-01-17, 18:41:48 +08] {taskinstance.py:1700} ERROR - Task failed with exception

Successful tasks were also intermittently flagged as failed:

Screenshot 2022-01-17 at 7 16 51 PM
[2022-01-17, 18:41:20 +08] {kubernetes_pod.py:372} INFO - creating pod with labels {'dag_id': 'stress-test-kubepodoperator', 'task_id': 'task_44', 'execution_date': '2022-01-17T103621.2231870000-7740efddd', 'try_number': '1'} and launcher <airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher object at 0x7f569d311f90>
[2022-01-17, 18:41:20 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Pending
[2022-01-17, 18:41:20 +08] {pod_launcher.py:133} WARNING - Pod not yet started: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529
[2022-01-17, 18:41:21 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Pending
[2022-01-17, 18:41:21 +08] {pod_launcher.py:133} WARNING - Pod not yet started: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529
[2022-01-17, 18:41:22 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Pending
[2022-01-17, 18:41:22 +08] {pod_launcher.py:133} WARNING - Pod not yet started: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529
[2022-01-17, 18:41:23 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Running
[2022-01-17, 18:41:23 +08] {pod_launcher.py:159} INFO - + r=62
[2022-01-17, 18:41:23 +08] {pod_launcher.py:159} INFO - + sleep 62s
[2022-01-17, 18:42:25 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Succeeded
[2022-01-17, 18:42:25 +08] {pod_launcher.py:333} INFO - Event with job id airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 Succeeded
[2022-01-17, 18:42:25 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Succeeded
[2022-01-17, 18:42:25 +08] {pod_launcher.py:333} INFO - Event with job id airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 Succeeded
[2022-01-17, 18:42:25 +08] {taskinstance.py:1277} INFO - Marking task as SUCCESS. dag_id=stress-test-kubepodoperator, task_id=task_44, execution_date=20220117T103621, start_date=20220117T104119, end_date=20220117T104225
[2022-01-17, 18:42:25 +08] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-01-17, 18:42:25 +08] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

Environment information:

ephraimbuddy commented 2 years ago

@eduardchai, your case seems different. It seems that the tasks are taking a long time to start. Try whether setting AIRFLOW__KUBERNETES__WORKER_PODS_QUEUED_CHECK_INTERVAL=86400 will resolve it

eduardchai commented 2 years ago

@ephraimbuddy it does reduce the number of errors by a lot! Thank you! There are some errors where the jobs are stuck in queued state, but I think it should be solvable by finding the right interval configuration for our use cases. Just out of curiosity, how did you know that the problem was with the interval from the logs? And btw, even though it is under AIRFLOW__KUBERNETES, this configuration somehow works with Celery Executor with KEDA too.

ephraimbuddy commented 2 years ago

@ephraimbuddy it does reduce the number of errors by a lot! Thank you! There are some errors where the jobs are stuck in queued state, but I think it should be solvable by finding the right interval configuration for our use cases. Just out of curiosity, how did you know that the problem was with the interval from the logs? And btw, even though it is under AIRFLOW__KUBERNETES, this configuration somehow works with Celery Executor with KEDA too.

If you are not on kubernetes then I'm not sure how it worked for you(I think it shouldn't work). What formed my opinion was that your pods were taking time to start and queued tasks were being moved to scheduled. So I don't know how it worked for your case. Maybe @jedcunningham can explain better. Maybe you should increase AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE also in kubernetes 🙃

bparhy commented 2 years ago

I am seeing similar issue in our Airflow with kubernetes environments.

Airflow Version : 2.1.3 Kubernetes Version : v1.20.5

What Happens This happens intermittently. I am seeing tasks are not deleting at the kubernetes side. Even though they are completed successfully in the UI they have an Error status at the pod.

We have 1000s of task and this happens only for couple of tasks so far.

Tasks Log: [2022-01-26 00:51:50,823] {local_task_job.py:209} WARNING - State of this instance has been externally set to success. Terminating instance. [2022-01-26 00:51:50,825] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 55 [2022-01-26 00:51:50,827] {taskinstance.py:1235} ERROR - Received SIGTERM. Terminating subprocesses. [2022-01-26 00:51:52,468] {process_utils.py:66} INFO - Process psutil.Process(pid=55, status='terminated', exitcode=1, started='00:51:45') (55) terminated with exit code 1

Task Pod Log [2022-01-26 00:51:37,973] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/name/name1/name2/name3_dag.py /usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/clsregistry.py:129 SAWarning: This declarative base already contains a class with the same class name and module name as bi_plugin.DagRun, and will be replaced in the string-lookup table. Running <TaskInstance: dagname.task_name 2022-01-25T00:50:00+00:00 [queued]> on host dagnametaskname.457ebbfa477643b289bceef4b0fbc1ab Traceback (most recent call last): File "/usr/local/bin/airflow", line 8, in sys.exit(main()) File "/usr/local/lib/python3.6/site-packages/airflow/main.py", line 40, in main args.func(args) File "/usr/local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, *kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run _run_task_by_selected_method(args, dag, ti) File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job run_job.run() File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 245, in run self._execute() File "/usr/local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 100, in _execute self.task_runner.start() File "/usr/local/lib/python3.6/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start self.process = self._start_by_fork() File "/usr/local/lib/python3.6/site-packages/airflow/task/task_runner/standard_task_runner.py", line 92, in _start_by_fork logging.shutdown() File "/usr/local/lib/python3.6/logging/init.py", line 1946, in shutdown h.close() File "/usr/local/lib/python3.6/logging/init.py", line 1048, in close stream.close() File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1237, in signal_handler raise AirflowException("Task received SIGTERM signal") airflow.exceptions.AirflowException: Task received SIGTERM signal [2022-01-26 00:51:51,249] {connection.py:499} INFO - closed [2022-01-26 00:51:51,249] {connection.py:502} INFO - No async queries seem to be running, deleting session Running <TaskInstance: dag_name.task_name 2022-01-25T00:50:00+00:00 [queued]> on host dagnametaskname.457ebbfa477643b289bceef4b0fbc1ab Traceback (most recent call last): File "/usr/local/bin/airflow", line 8, in sys.exit(main()) File "/usr/local/lib/python3.6/site-packages/airflow/main.py", line 40, in main args.func(args) File "/usr/local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(args, kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run _run_task_by_selected_method(args, dag, ti) File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job run_job.run() File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 245, in run self._execute() File "/usr/local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 145, in _execute self.on_kill() File "/usr/local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 171, in on_kill self.task_runner.on_finish() File "/usr/local/lib/python3.6/site-packages/airflow/task/task_runner/base_task_runner.py", line 178, in on_finish self._error_file.close() File "/usr/local/lib/python3.6/tempfile.py", line 511, in close self._closer.close() File "/usr/local/lib/python3.6/tempfile.py", line 448, in close unlink(self.name) FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpfabi4n96'

I have not set configuration params and all have default value. I see a CPU spike but unable to relate it.

Thanks in advance.

morhook commented 2 years ago

@bparhy have you checked kubelet logs to detect node pressure evictions or other related evictions?

bparhy commented 2 years ago

@morhook I check with my K8s and they dont find anything unusual. I tried increasing the metadata DB size (Aurora) and that also did not help . Any solution in this direction please. We are currently running Airflow 2.1.3 using k8s.

Please let me know.