apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

Tasks taking too long time after 2.7.0 Airflow update #33688

Closed potiuk closed 1 year ago

potiuk commented 1 year ago

Discussed in https://github.com/apache/airflow/discussions/33664

Originally posted by **jaetma** August 23, 2023 Hi community! We have been using Airflow quite a long time, and right after updating from version 2.6.3 to 2.7.0, the running time increased extremely high. Tasks that used to take 15 seconds to complete now are taking 10 minutes! This is problematic because there are more tasks being queued than those that are finished. We've detected this issue in 3 projects running with Airflow, across 2 instances in Kubernetes and 1 with Docker. Illustrating image: ![image](https://github.com/apache/airflow/assets/123120332/1459f478-f2ca-42ed-b956-5fd52af52a8a)
potiuk commented 1 year ago

From the discussion:

HI @jaetma , Could you help answer the following questions that could help us check this more?

  1. do you have some monitoring in place? Can you check what is the resource consumption for each of the Airflow components/pods?
  2. Have you added any new DAGs that could be eating up a lot of resources and hence tasks are contending for resources?
  3. What do the task logs say? Are they stuck at some specific step always?
  4. How and where is Airflow deployed?
  5. Which executor? Celery/Kubernetes/?

Answers:

  1. Currently, we only have this DAG running, with a maximum of 4 DAG runs, and those are taking all the machine resources. Tasks are taking over 2 minutes to start, and when running, they are also slow:
  2. No, recently we have not added any DAGs with high resource consumption.
  3. Logs are taking too much time to load in the Airflow UI. They are always stuck before starting, in the queued state.
  4. Airflow is deployed on a server with docker-compose.
  5. We use Celery as the executor.

image

image

Today we did a downgrade to 2.6.3 again, the issue seems to be resolved!

image

potiuk commented 1 year ago

I converted that discussion to an issue, as I think it is something we should at least try to diagnose before 2.7.1 - and maybe @jaetma you could help with it and (assuming we will find a root cause) you could test it with release candidate of 2.7.1 that should go out soon).

@jaetma - woudl it be possible to get some logs from the time when it was slow - namely logs from workers and task, ideally maybe getting logs from the same task from before and after change to 2.7.0 - showing what's going on and maybe comparing what was happening then? Seeing the same logs from 2.7.0 and 2.6.3 could help us to make some hypothesis what was wrong.

Ideally maybe two gists showing logs from similar run of the same task?

pankajkoti commented 1 year ago

Also @jaetma, is it that the tasks remain queued for a long time before they begin execution? Can you check if the scheduler logs have something saying as well?

jaetma commented 1 year ago

@potiuk

Thanks for your response, I can test in our dev environ!

After the downgrade the task run time is normal: image

The next line are from Ariflow tasks logs 2.6.3 and 2.7.0:

AIRFLOW 2.6.3 TASK RUN LOGS

TASK: fetch_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=fetch_header/attempt=2.log
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1308} INFO - Starting attempt 2 of 2
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): fetch_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:57} INFO - Started process 4198 to run task
[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'fetch_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182100', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpqdgbw1tb']
[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:85} INFO - Job 1182100: Subtask fetch_header
[2023-08-23, 19:28:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:28:46 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:28:46 UTC] {logging_mixin.py:150} INFO - { censored }
[2023-08-23, 19:28:46 UTC] {python.py:183} INFO - Done. Returned value was: { censored }
[2023-08-23, 19:28:46 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=fetch_header, execution_date=20230822T013630, start_date=20230823T192845, end_date=20230823T192846
[2023-08-23, 19:28:46 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:28:46 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

TASK: add_fixed_values_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=add_fixed_values_header/attempt=2.log
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1308} INFO - Starting attempt 2 of 2
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): add_fixed_values_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:28:51 UTC] {standard_task_runner.py:57} INFO - Started process 4240 to run task
[2023-08-23, 19:28:51 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'add_fixed_values_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182114', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpqkq5pev4']
[2023-08-23, 19:28:51 UTC] {standard_task_runner.py:85} INFO - Job 1182114: Subtask add_fixed_values_header
[2023-08-23, 19:28:52 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:28:52 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='add_fixed_values_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:28:52 UTC] {python.py:183} INFO - Done. Returned value was: { censored }
[2023-08-23, 19:28:52 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=add_fixed_values_header, execution_date=20230822T013630, start_date=20230823T192851, end_date=20230823T192852
[2023-08-23, 19:28:53 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:28:53 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

TASK: schema_validation_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=schema_validation_header/attempt=1.log
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): schema_validation_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:29:01 UTC] {standard_task_runner.py:57} INFO - Started process 4293 to run task
[2023-08-23, 19:29:01 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'schema_validation_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182141', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpx34mwefz']
[2023-08-23, 19:29:01 UTC] {standard_task_runner.py:85} INFO - Job 1182141: Subtask schema_validation_header
[2023-08-23, 19:29:02 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:29:03 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='schema_validation_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:29:03 UTC] {python.py:183} INFO - Done. Returned value was: { censored }
[2023-08-23, 19:29:03 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=schema_validation_header, execution_date=20230822T013630, start_date=20230823T192901, end_date=20230823T192903
[2023-08-23, 19:29:03 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:29:03 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

TASK: generate_sql_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=generate_sql_header/attempt=1.log
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): generate_sql_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:29:08 UTC] {standard_task_runner.py:57} INFO - Started process 4345 to run task
[2023-08-23, 19:29:08 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'generate_sql_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182162', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp03kw7z3y']
[2023-08-23, 19:29:08 UTC] {standard_task_runner.py:85} INFO - Job 1182162: Subtask generate_sql_header
[2023-08-23, 19:29:08 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:29:09 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='generate_sql_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:29:09 UTC] {python.py:183} INFO - Done. censored
[2023-08-23, 19:29:09 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=generate_sql_header, execution_date=20230822T013630, start_date=20230823T192908, end_date=20230823T192909
[2023-08-23, 19:29:10 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:29:10 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

TASK: execute_sql

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=execute_sql/attempt=1.log
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1327} INFO - Executing <Task(PostgresOperator): execute_sql> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:29:14 UTC] {standard_task_runner.py:57} INFO - Started process 4380 to run task
[2023-08-23, 19:29:14 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'execute_sql', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182181', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp_7byh4dx']
[2023-08-23, 19:29:14 UTC] {standard_task_runner.py:85} INFO - Job 1182181: Subtask execute_sql
[2023-08-23, 19:29:15 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:29:15 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='execute_sql' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:29:15 UTC] {sql.py:265} INFO - censored
[2023-08-23, 19:29:16 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 19:29:16 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 19:29:16 UTC] {sql.py:374} INFO - censored
[2023-08-23, 19:29:16 UTC] {sql.py:383} INFO - Rows affected: 1
[2023-08-23, 19:29:16 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=execute_sql, execution_date=20230822T013630, start_date=20230823T192914, end_date=20230823T192916
[2023-08-23, 19:29:16 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:29:16 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

AIRFLOW 2.7.0 TASK RUN LOGS

TASK: fetch_header

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=fetch_header/attempt=1.log
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): fetch_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:57} INFO - Started process 32217 to run task
[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'fetch_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180119', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp6qr0sdyy']
[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:85} INFO - Job 1180119: Subtask fetch_header
[2023-08-23, 18:23:35 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:24:06 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:24:06 UTC] {logging_mixin.py:151} INFO - {censored }
[2023-08-23, 18:24:06 UTC] {python.py:194} INFO - Done. Returned value was: { censored }
[2023-08-23, 18:24:06 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=fetch_header, execution_date=20230822T013629, start_date=20230823T182304, end_date=20230823T182406
[2023-08-23, 18:24:06 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:24:06 UTC] {taskinstance.py:2784} INFO - 1 downstream tasks scheduled from follow-on schedule check

TASK: add_fixed_values_header

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=add_fixed_values_header/attempt=1.log
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): add_fixed_values_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:25:22 UTC] {standard_task_runner.py:57} INFO - Started process 32288 to run task
[2023-08-23, 18:25:22 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'add_fixed_values_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180123', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpxx8qvkq_']
[2023-08-23, 18:25:22 UTC] {standard_task_runner.py:85} INFO - Job 1180123: Subtask add_fixed_values_header
[2023-08-23, 18:25:48 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:26:12 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='add_fixed_values_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:26:12 UTC] {python.py:194} INFO - Done. Returned value was: { censored }
[2023-08-23, 18:26:12 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=add_fixed_values_header, execution_date=20230822T013629, start_date=20230823T182522, end_date=20230823T182612
[2023-08-23, 18:26:12 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:26:12 UTC] {taskinstance.py:2784} INFO - 1 downstream tasks scheduled from follow-on schedule check

TASK: schema_validation_header

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=schema_validation_header/attempt=1.log
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): schema_validation_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:27:28 UTC] {standard_task_runner.py:57} INFO - Started process 32360 to run task
[2023-08-23, 18:27:28 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'schema_validation_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180127', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp7fjqwb36']
[2023-08-23, 18:27:28 UTC] {standard_task_runner.py:85} INFO - Job 1180127: Subtask schema_validation_header
[2023-08-23, 18:27:54 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:28:20 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='schema_validation_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:28:20 UTC] {python.py:194} INFO - Done. Returned value was: { censored }
[2023-08-23, 18:28:20 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=schema_validation_header, execution_date=20230822T013629, start_date=20230823T182728, end_date=20230823T182820
[2023-08-23, 18:28:20 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:28:20 UTC] {taskinstance.py:2784} INFO - 1 downstream tasks scheduled from follow-on schedule check

TASK: generate_sql_header

*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=generate_sql_header/attempt=1.log
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): generate_sql_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:29:32 UTC] {standard_task_runner.py:57} INFO - Started process 32423 to run task
[2023-08-23, 18:29:32 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'generate_sql_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180131', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpq9g7hxl_']
[2023-08-23, 18:29:32 UTC] {standard_task_runner.py:85} INFO - Job 1180131: Subtask generate_sql_header
[2023-08-23, 18:30:00 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:30:41 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='generate_sql_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:30:41 UTC] {python.py:194} INFO - Done. Returned value was: censored
[2023-08-23, 18:30:41 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=generate_sql_header, execution_date=20230822T013629, start_date=20230823T182932, end_date=20230823T183041
[2023-08-23, 18:30:42 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:30:42 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check

TASK: execute_sql

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=execute_sql/attempt=1.log
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1382} INFO - Executing <Task(PostgresOperator): execute_sql> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:32:47 UTC] {standard_task_runner.py:57} INFO - Started process 32534 to run task
[2023-08-23, 18:32:47 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'execute_sql', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180141', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpfmvj5dnz']
[2023-08-23, 18:32:47 UTC] {standard_task_runner.py:85} INFO - Job 1180141: Subtask execute_sql
[2023-08-23, 18:33:22 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:33:43 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='execute_sql' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:33:43 UTC] {sql.py:274} INFO - Executing: censored
[2023-08-23, 18:33:43 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 18:33:43 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 18:33:43 UTC] {sql.py:418} INFO - Running statement: censored
[2023-08-23, 18:33:43 UTC] {sql.py:427} INFO - Rows affected: 1
[2023-08-23, 18:33:43 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=execute_sql, execution_date=20230822T013629, start_date=20230823T183247, end_date=20230823T183343
[2023-08-23, 18:33:43 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:33:44 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check
jaetma commented 1 year ago

Hi @pankajkoti!

Sadly, I cannot obtain the scheduler logs from the time it was slow :/ However, you're correct: the tasks remained queued for a long time before they began execution. Every task was stuck in a queued state for ~2 minutes before starting its execution. Once running, they took several more minutes to finish. You can see that in the logs I share

This resulted in a DAG run time of 10 minutes!

potiuk commented 1 year ago

OK. That gives us some clue. It seems that there are huge delays (20-30 seconds) between some steps that in 2.6.3 have been almost immediate:

2.7.0

[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:85} INFO - Job 1180119: Subtask fetch_header
+ 29 s
[2023-08-23, 18:23:35 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
+ 29 s
[2023-08-23, 18:24:06 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:24:06 UTC] {logging_mixin.py:151} INFO - {censored }

Compare it with 2.6.3

[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:85} INFO - Job 1182100: Subtask fetch_header
[2023-08-23, 19:28:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:28:46 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='example@mail.com' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:28:46 UTC] {logging_mixin.py:150} INFO - { censored }

So at least we might come up with some hypotheses. I will take a closer look but maybe others migh have some ideas.

pankajkoti commented 1 year ago

I think we might need some more insights on the specific DAG (what it does) or the dependencies. In general, for our deployments, our suite that triggers a few DAGs using TriggerDagOperator with numerous tasks within them including deferrable operators (execution duration ~40-45mins), I have not observed any such slowdown after upgrading to 2.7.0

Screenshot 2023-08-25 at 2 46 31 PM
sunank200 commented 1 year ago

@jaetma could you share the DAG you are using? That way I can try to replicate it locally. As @pankajkoti mentioned in our deployments I cannot see any slowness.

potiuk commented 1 year ago

Yes. I do not suspect it's a "widespread" problem - IMHO this is something environmental on your side that triggers it - some dependency, that introduces a lot of overhead, specific database connectivity you have, some resource limiting you have in your environment @jaetma.

Few questions: - can you share a very detailed description of your environment - including versions of OS you run it on, whether the machine are run in some cloud or bare metal whether you have some virtualisation techniques, how your docker compose si configured, do you have any special resources limits applied (memory, sockets, I/O/CPU), what is the database you are using, whether the database has some limits etc. do you use PGBouncer if you use Postgres.

My question is also - is airflow version the ONLY thing that changes when you migrate? Do you also migrate other things together with airflow (docker/docker/compose/different DB instance/different account with different resource limits etc.) . I want to rule out that Airflow is the culprit - maybe together with Airflow migration. You also migrated other things and they are the guilty ones?

potiuk commented 1 year ago

I looked shortly and:

The logs below are coming from the same forked process.

Between this:

[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:85} INFO - Job 1180119: Subtask fetch_header

and this:

[2023-08-23, 18:23:35 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1

the following things happen:

1) setproctitle -> setting title of the process 2) setting few environment variables: _AIRFLOW_PARSING_CONTEXT_DAG_ID, _AIRFLOW_PARSING_CONTEXT_TASK_ID 3) parsing command parameters ("airflow task run --raw ...." 4) loading config file prepared before the process is forked 5) on_starting() listener is fired 6) dag/tasks are parsed from DAG files or if you are using pickling (likely now) from pickled representation

From those actions, I could likely exclude firs 4 (unless your filesystem is broken) and by the deduction method if we exclude the impossible, what remains must be the reason. So it's either 5) or 6).

My best guess @jaetma is that either:

Hypothesis 1

Your on_start_listener is hanging on something -> not very likely that you already have some listener but since you were on 2.6, it's possible.

Hypothesis 2

Parsing your DAGs inside the Celery worker is hanging on something. The most likely reason is that you have some TOP level code that (for example) does a networking call that hangs.

My intelligent guess and the 30 seconds suggest that your DNS is misconfigured/broken or networking prevents it from responding quickly.

Hanging on DNS call is quite plausible hypothesis. From what I remember 30 seconds is often default DNS resolution timeout. So my best guest is that somewhere during your migration your environment's networking gets broken in your Docker Compose and the DNS you have is not working properly, thus making whatever you do on Top level of your DAG (which BTW you should not if you do) slow to run.

Obligatory Haiku:

image

Hypothesis 2a

Another variant of the Hypothesis 2) if you are using Airflow Variables at the top of your DAG code or any other database access (which BTW you should not if you do), this might lead to a database connection. And opening new DB connection might be problematic from your server point of view if there are already many connections opened. You will see in your database server by high number of opened connections. And Postgres does not cope well with the number of connection airflow opens so if you use Postgres, and do not have Pgbouncer between Airflow and Postgres - this might be the reason. I would love if you could check it because I have a reason to believe we could have many more connections opened in Airflow 2.7 (I have just a suspicion about it). So if this could be checked - you should see a much larger number of connections to your DB if my guess is right when you run 2.7). If you could check that hypothesis, that would be great.

mpolatcan commented 1 year ago

I have encountered same problem on our production environment that uses KubernetesExecutor. We have 150+ DAGs and these DAGs generated by single generator DAG file. Interesting part is that worker pod memory usages reached to 3x memory of previous versions. Previously we gave 500 MB memory to worker pod. Now, we had to gave 1.5 GB and in general worker pod doesn't do anything heavily so i am shocked when I see memory usages :( Also, Airflow Scheduler dies frequently I didn't understood really well. I monitored database, scheduler pod memory usages etc. Everything is normal but scheduler dies frequently and DAGBag import time increased. So, I downgraded to previous version 2.6.3 and problem is solved for now 👍 ☺️

P.S We are using 3 replicas PgBouncer in front of our AWS RDS instance.

jaetma commented 1 year ago

Hello @sunank200

This is the DAG, its just a simple query insertion:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import timedelta, datetime
import logging
import os

logger = logging.getLogger("airflow.task")

default_run_days = 1

interval = {
    'dev': None,
    'qa': None,
    'prod': None
}

emails = {
    'dev': ['{ censored }'],
    'qa': ['{ censored }'],
    'prod': ['{ censored }']
}

# default arguments
default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'email': emails[os.environ['ENVIRONMENT']],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2)
}

def on_failure_callback(context):
    from sendgrid import SendGridAPIClient
    from sendgrid.helpers.mail import Mail
    import traceback

    ti = context['task_instance']
    print("FAILURE CALLBACK")

    exception = context.get('exception')

    formatted_exception = ''.join(
        traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)
    ).strip()

    message = Mail(
        from_email=os.environ.get('SENDGRID_MAIL_FROM'),
        to_emails=emails[os.environ['ENVIRONMENT']],
        subject=f'Airflow error: {ti.dag_id}, Env:' + str(os.environ['ENVIRONMENT']),
        html_content=f"""
        <p>ENV: {str(os.environ['ENVIRONMENT'])}</p>
        <p>DAG ID: {ti.dag_id}</p>
        <p>TASK ID: {ti.task_id}</p>
        <p>DATE: {str(datetime.now())}</p>
        <p>EXCEPTION: {formatted_exception}</p>
        """
    )
    sg = SendGridAPIClient(open(os.environ.get('SENDGRID_API_KEY')).read().replace('\n', ''))
    sg.send(message)

# initializing dag
dag = DAG(
    'sale_transaction',
    default_args=default_args,
    catchup=False,
    schedule_interval=interval[os.environ['ENVIRONMENT']],
    max_active_runs=20,
    tags=['sale_transaction'],
)

def fetch_header(ti, **kwargs):
    record = kwargs["dag_run"].conf
    print(record)
    output_json = { 
        "trx_id": 1, 
        "trx_tipo_trx": record['acquirer']['transaction_type_code'],
        "trx_emisor": record['acquirer']['acquirer_code'],
        "trx_comercio": record['merchant']['merchant_code'], 
        "trx_local": record['merchant']['store_code'], 
        "trx_pos": record['merchant']['terminal_code'], 
        "trx_pais": record['merchant']['country'],
        "trx_boleta": record['transaction']['document_number'], 
        "trx_fecha": datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d %H:%M:%S'), 
        "trx_hora": datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d %H:%M:%S'), 
        "trx_numero": record['transaction']['transaction_number'], 
        "trx_estado": record['status']['status_code'], 
        "trx_cod_rechazo": record['status']['rejected_code'], 
        "trx_glosa_rech": record['status']['rejected_description'], 
        "trx_estado_portal": record['status']['portal'], 
        "trx_version": record['status']['version'], 
        "trx_obs": record['status']['obs'],
        "trx_tarjeta": record['acquirer']['card_type_code'],
        "trx_monto": record['payment']['amount'],
        "trx_cuotas": record['payment']['installments'],
        "trx_ult4_dig": 0, #record['payment']['card_number'][-4:], #GET LAST 4 characters
        "trx_ts_req": (datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d %H:%M:%S')) if record['transaction']['accounted_at'] else None, 
        "trx_ts_rsp": (datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d %H:%M:%S')) if record['transaction']['accounted_at'] else None, 
        "trx_numtarjeta": record['payment']['card_number'],
        "trx_codautor": record['payment']['authorization_code'],
        "trx_mpago": record['payment']['m'],
        "trx_pin": record['payment']['pin'],
        "trx_cadena": record['merchant']['channel_code'], 
        "trx_vend_caj": record['merchant']['operator_code'], 
        "trx_fechacont": datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d 00:00:00'), 
        "trx_oc_merchant_code": record['agoraweb']['purchase_order_merchant_code'], 
        "trx_oc_merchant_description": record['agoraweb']['purchase_order_merchant_description'], 
        "trx_oc_order_code": record['agoraweb']['purchase_order_code'], 
        "trx_oc_order_amount": record['agoraweb']['purchase_order_monto'], 
        "trx_usuario1": record['metadata']['1'],
        "trx_usuario2": record['metadata']['2'],
        "trx_usuario3": record['metadata']['3'],
        "trx_usuario4": record['metadata']['4'],
        "trx_usuario5": record['metadata']['5'],
        "trx_usuario6": record['metadata']['6'],
        "trx_usuario7": record['metadata']['7'],
        "trx_usuario8": record['metadata']['8'],
        "trx_llave": record['metadata']['ref_id'],
        "trx_id_con": record['metadata']['ref_id_con'],
        "trx_journal": record['metadata']['journal'],
    }
    return output_json

def add_fixed_values_header(ti, **kwargs):
    record = ti.xcom_pull(task_ids=['fetch_header'])[0]
    record['trx_ip'] = 0
    record['trx_vuelto'] = 'NULL'
    record['trx_donacion'] = 'NULL'
    record['trx_nro_lote'] = 'NULL'
    record['trx_lote_abono_com'] = 0
    record['trx_lote_abono_ban'] = 0
    return record

def schema_validation_header(ti):
    from schema import Schema, And, Use
    record = ti.xcom_pull(task_ids=['add_fixed_values_header'])[0]
    conf_schema = Schema({
        "trx_id": And(Use(int)),
        "trx_tipo_trx": And(Use(str)),
        "trx_emisor": And(Use(str)),
        "trx_comercio": And(Use(str)),
        "trx_local": And(Use(int)),
        "trx_pos": And(Use(int)),
        "trx_boleta": And(Use(str)),
        "trx_fecha": And(Use(str)),
        "trx_hora": And(Use(str)),
        "trx_numero": And(Use(str)),
        "trx_estado": And(Use(int)),
        "trx_cod_rechazo": And(Use(str)),
        "trx_glosa_rech": And(Use(str)),
        "trx_tarjeta": And(Use(str)),
        "trx_monto": And(Use(int)),
        "trx_cuotas": And(Use(int)),
        "trx_ult4_dig": And(Use(str)),
        "trx_ts_req": And(Use(str)),
        "trx_ts_rsp": And(Use(str)),
        "trx_numtarjeta": And(Use(str)),
        "trx_codautor": And(Use(str)),
        "trx_cadena": And(Use(int)),
        "trx_vend_caj": And(Use(str)),
        "trx_fechacont": And(Use(str)),
        "trx_oc_merchant_code": And(Use(str)),
        "trx_oc_merchant_description": And(Use(str)),
        "trx_oc_order_code": And(Use(str)),
        "trx_oc_order_amount": And(Use(str)),
        #FIXED
        "trx_version": And(Use(str)),
        "trx_journal": And(Use(str)),
        "trx_id_con": And(Use(str)),
        "trx_pais": And(Use(str)),
        "trx_estado_portal": And(Use(int)),
        "trx_llave": And(Use(str)),
        "trx_mpago": And(Use(str)),
        "trx_pin": And(Use(str)),
        "trx_ip": And(Use(str)),
        "trx_vuelto": And(Use(str)),
        "trx_donacion": And(Use(str)),
        "trx_nro_lote": And(Use(str)),
        "trx_usuario1": And(Use(str)),
        "trx_usuario2": And(Use(str)),
        "trx_usuario3": And(Use(str)),
        "trx_usuario4": And(Use(str)),
        "trx_usuario5": And(Use(str)),
        "trx_usuario6": And(Use(str)),
        "trx_usuario7": And(Use(str)),
        "trx_usuario8": And(Use(str)),
        "trx_obs": And(Use(str)),
        "trx_lote_abono_com": And(Use(int)),
        "trx_lote_abono_ban": And(Use(int)),
    })
    conf_schema.validate(record)
    return record

def generate_sql_header(ti):
    record = ti.xcom_pull(task_ids=['schema_validation_header'])[0]
    SQL = ""
    table = "{ censored }"
    SQL+='{ censored }'
    SQL+="""
        INSERT INTO {table} (
            trx_id,
            trx_tipo_trx, trx_emisor, trx_comercio, trx_local, 
            trx_pos, trx_boleta, trx_fecha, trx_hora,
            trx_numero, trx_estado, trx_cod_rechazo, trx_glosa_rech,
            trx_tarjeta, trx_monto, trx_cuotas, trx_ult4_dig,
            trx_ts_req, trx_ts_rsp, trx_numtarjeta, trx_codautor,
            trx_cadena, trx_vend_caj, trx_fechacont, trx_oc_merchant_code,
            trx_oc_merchant_description, trx_oc_order_code, trx_oc_order_amount, trx_version,
            trx_journal, trx_id_con, trx_pais, trx_estado_portal, 
            trx_llave, trx_mpago, trx_pin, trx_ip, 
            trx_vuelto, trx_donacion, trx_nro_lote, trx_usuario1,
            trx_usuario2, trx_usuario3, trx_usuario4, trx_usuario5,
            trx_usuario6, trx_usuario7, trx_usuario8, trx_obs,
            trx_lote_abono_com, trx_lote_abono_ban
        ) VALUES (
            { censored },
            '{trx_tipo_trx}', '{trx_emisor}', {trx_comercio}, {trx_local}, 
            {trx_pos}, {trx_boleta}, '{trx_fecha}', '{trx_hora}',
            {trx_numero}, {trx_estado}, {trx_cod_rechazo}, '{trx_glosa_rech}',
            '{trx_tarjeta}', {trx_monto}, {trx_cuotas}, '{trx_ult4_dig}',
            {trx_ts_req}, {trx_ts_rsp}, '{trx_numtarjeta}', {trx_codautor},
            {trx_cadena}, {trx_vend_caj}, '{trx_fechacont}', {trx_oc_merchant_code},
            {trx_oc_merchant_description}, {trx_oc_order_code}, {trx_oc_order_amount}, {trx_version},
            {trx_journal}, '{trx_id_con}', {trx_pais}, {trx_estado_portal}, 
            '{trx_llave}', {trx_mpago}, {trx_pin}, {trx_ip}, 
            {trx_vuelto}, {trx_donacion}, {trx_nro_lote}, {trx_usuario1},
            {trx_usuario2}, {trx_usuario3}, {trx_usuario4}, {trx_usuario5},
            {trx_usuario6}, {trx_usuario7}, {trx_usuario8}, {trx_obs},
            {trx_lote_abono_com}, {trx_lote_abono_ban}
        );
    """.format(
        table=table, 
        trx_tipo_trx=record["trx_tipo_trx"],
        trx_emisor=record["trx_emisor"],
        trx_comercio=record["trx_comercio"],
        trx_local=record["trx_local"],
        trx_pos=record["trx_pos"],
        trx_boleta=record["trx_boleta"],
        trx_fecha=record['trx_fecha'],
        trx_hora=record['trx_hora'],
        trx_numero=record['trx_numero'],
        trx_estado=record["trx_estado"],
        trx_cod_rechazo=record["trx_cod_rechazo"] if record["trx_cod_rechazo"] else '0',
        trx_glosa_rech=record["trx_glosa_rech"],
        trx_tarjeta=record["trx_tarjeta"],
        trx_monto=record["trx_monto"],
        trx_cuotas=record["trx_cuotas"],
        trx_ult4_dig=record["trx_ult4_dig"],
        trx_ts_req=("'" + record["trx_ts_req"] + "'") if record["trx_ts_req"] else 'NULL',
        trx_ts_rsp=("'" + record["trx_ts_rsp"] + "'") if record["trx_ts_rsp"] else 'NULL',
        trx_numtarjeta=record["trx_numtarjeta"],
        trx_codautor=("'" + record["trx_codautor"] + "'") if record["trx_codautor"] else 'NULL',
        trx_cadena=record["trx_cadena"],
        trx_vend_caj=record["trx_vend_caj"],
        trx_fechacont=record["trx_fechacont"],
        trx_oc_merchant_code=("'" + record["trx_oc_merchant_code"] + "'") if record["trx_oc_merchant_code"] else 'NULL',
        trx_oc_merchant_description=("'" + record["trx_oc_merchant_description"] + "'") if record["trx_oc_merchant_description"] else 'NULL',
        trx_oc_order_code=("'" + record["trx_oc_order_code"] + "'") if record["trx_oc_order_code"] else 'NULL',
        trx_oc_order_amount=record["trx_oc_order_amount"] if record["trx_oc_order_amount"] else 'NULL',
        trx_version=record["trx_version"] if record["trx_version"] else 'NULL',
        trx_journal=("'" + record["trx_journal"] + "'") if record["trx_journal"] else 'NULL',
        trx_id_con=record["trx_id_con"],
        trx_pais=record["trx_pais"],
        trx_estado_portal=record["trx_estado_portal"],
        trx_llave=record["trx_llave"],
        trx_mpago=("'" + record["trx_mpago"] + "'") if record["trx_mpago"] else 'NULL',
        trx_pin=record["trx_pin"],
        trx_ip=record["trx_ip"],
        trx_vuelto=record["trx_vuelto"],
        trx_donacion=record["trx_donacion"],
        trx_nro_lote=record["trx_nro_lote"],
        trx_usuario1=record["trx_usuario1"],
        trx_usuario2=record["trx_usuario2"],
        trx_usuario3=("'" + str(record["trx_usuario3"]) + "'") if record["trx_usuario3"] else 'NULL',
        trx_usuario4=("'" + str(record["trx_usuario4"]).replace("'", '') + "'") if record["trx_usuario4"] else 'NULL',
        trx_usuario5=("'" + record["trx_usuario5"] + "'") if record["trx_usuario5"] else 'NULL',
        trx_usuario6=("'" + str(record["trx_usuario6"]) + "'") if record["trx_usuario6"] else 'NULL',
        trx_usuario7=("'" + str(record["trx_usuario7"]).replace("'", '') + "'") if record["trx_oc_merchant_description"] else 'NULL',
        trx_usuario8=("'" + str(record["trx_usuario8"]) + "'") if record["trx_usuario8"] else 'NULL',
        trx_obs=("'" + str(record["trx_obs"]) + "'") if record["trx_obs"] else 'NULL',
        trx_lote_abono_com=record["trx_lote_abono_com"],
        trx_lote_abono_ban=record["trx_lote_abono_ban"],
    )
    return SQL

fetch_header = PythonOperator(
    task_id='fetch_header',
    provide_context=True,
    python_callable=fetch_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

add_fixed_values_header = PythonOperator(
    task_id='add_fixed_values_header',
    provide_context=True,
    python_callable=add_fixed_values_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

schema_validation_header = PythonOperator(
    task_id='schema_validation_header',
    provide_context=True,
    python_callable=schema_validation_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

generate_sql_header = PythonOperator(
    task_id='generate_sql_header',
    provide_context=True,
    python_callable=generate_sql_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

execute_sql = PostgresOperator(
    task_id="execute_sql",
    postgres_conn_id='ple_postgres_' + os.environ['ENVIRONMENT'],
    sql="{{ ti.xcom_pull(task_ids=['generate_sql_header'])[0] }}",
    dag=dag
)

fetch_header >> add_fixed_values_header >> schema_validation_header >> generate_sql_header >> execute_sql
jaetma commented 1 year ago

This is my docker-compose file running in:

Red Hat Enterprise Linux Server, cloud

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.1.4
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_GID                  - Group ID in Airflow containers
#                                Default: 0
#
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.3-python3.8}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW: -1
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__SMTP__STARTTLS: 'true'
    AIRFLOW__SMTP__SSL: 'false'
    AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
    AIRFLOW__SMTP__SMTP_PORT: 587
    AIRFLOW__SMTP__SMTP_USER: censored
    AIRFLOW__SMTP__SMTP_PASSWORD: censored
    AIRFLOW__SMTP__SMTP_MAIL_FROM: censored
    AIRFLOW__EMAIL__SUBJECT_TEMPLATE: /opt/airflow/dags/email_subject_template.j2
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__API__ACCESS_CONTROL_ALLOW_HEADERS: 'origin,content-type,accept,authorization'
    AIRFLOW__API__ACCESS_CONTROL_ALLOW_METHODS: 'POST,GET,OPTIONS,DELETE'
    AIRFLOW__API__ACCESS_CONTROL_ALLOW_ORIGINS: '*'
    _AIRFLOW_WWW_USER_USERNAME: 'censored'
    _AIRFLOW_WWW_USER_PASSWORD: 'censored'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-pyiso8583 plyvel schema jsonschema redis==4.3.4}
    CENSORED_ENVIRONMENT: ${CENSORED_ENVIRONMENT:-dev} # dev - qa - prod
    SENDGRID_MAIL_FROM: censored
    SENDGRID_API_KEY: /opt/airflow/keys/sendgrid-agoraqr.txt
    GOOGLE_APPLICATION_CREDENTIALS: /opt/airflow/keys/censored.json
  volumes:
    - /etc/localtime:/etc/localtime:ro  # This is to ensure docker container date is the same host date
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./keys:/opt/airflow/keys
    - ./files:/opt/airflow/files
    - /opt/censored/files/comercio/censored:/opt/airflow/censored
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis/redis-stack-server:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 7093:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:7093/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.1.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID and AIRFLOW_GID environment variables, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-16DF48936F837B402448FBBFF}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:${AIRFLOW_GID:-0}"
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5556:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5556/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  db-init:
    environment:
      ENVIRONMENT: ${ENVIRONMENT:-dev} # dev - qa - prod
    build:
      context: db-init
      dockerfile: Dockerfile.init
    depends_on:
      - postgres

volumes:
  postgres-db-volume:
jaetma commented 1 year ago

Hi @potiuk!

Our hypothesis is that this issue might be related to changes in Python's pickle library. Airflow 2.6.3 was primarily used with Python 3.7 in our environment. For the downgrade to work correctly, we had to update our image to use Python 3.8 due to the changes in the pickle library between these Python versions. We suspect that these changes might be affecting Airflow's internal handling of task serialization and deserialization, thus causing the performance degradation.

potiuk commented 1 year ago

@jaetma and @mpolatcan

So can you reiterate and explain again what were the differences between the environments you have? It's clear that you seem to hit the problem when you upgrade Airflow to 2.7.0 but it seems this is not the only thing you are upgrading. @mpolatcan you mentioned that you also upgreaded Python version and you mentioned piclkle library but this is a bit vague. Can you plese help us to narrow it down and maybe even do some experimenting to help us?

  1. Can you please extract out the difference with your enviroments. Ideally in a short summary: A. Airflow 2.6.3, Python version any other differences <- does not have problems B. Airflow 2.7.0, Python version any other differences <- have problem

  2. @mpolatcan especially, you mentioned some suspicion about Python version 3.8 and pickling library. Can you please elaborate on this and maybe (if that is possible) - attempt to do the same upgrades you did for Airlfow 2.7 but WITHOUT upgrading Airflow (staying with 2.6.3) and see if you observe the same memory growth and stability issues? Is this possible? If not cany you explain what requiremets etc. are preventing it ? That woudl help us enormously in an attempt to track down the root cause of the problem. Also @jaetma - maybe you have a possibility to do similar excercise.

Also - are you using pickling? https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#donot-pickle - airflow's "donot-pickle" is set to "True" by default. Similarly https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#enable-xcom-pickling is set to "False"- XCom pickling is disabled by default. You mentioned pickling library, but (except Python Virtualenv/External Python operator) we are not really using pickling for anything in Airflow. We do use serialization, but without Pickling so I'd be surprised to see pickling library has an effect. But maybe you have those parameter's changed? Can you also (if you get to the point that you have an installation that exhibits the problem) try to change those values (if your current configuration uses pickling).

I am trying to narrow down the problem - we know already that in some cases, Airflow 2.7.0 upgrade might trigger some problems. But we also know that it is not in all environments, only some. So we need to narrow down and track what is really causing it - is it just Airflow, or Python version or some dependencies.

I'd relaly love if we can do this remote experiments with you to see if we can track the root cause of the problem.

And Many thanks for the reports so far, this is already helpful to see that there is something we need to track down.

Taragolis commented 1 year ago

@jaetma Just wondering, are you use this ENV VAR in production?

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-pyiso8583 plyvel schema jsonschema redis==4.3.4}

I'm not sure that is this a problem here however the base recommendation do not use _PIP_ADDITIONAL_REQUIREMENTS, more details was added in docker compose sample stack in Airflow 2.5.2

# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''
potiuk commented 1 year ago

I'm not sure that is this a problem here however the base recommendation do not use _PIP_ADDITIONAL_REQUIREMENTS, more details was added in docker compose sample stack in Airflow 2.5.2

This is exceptionally good point @Taragolis . NEVER EVER UNDER ANY CIRCUMSTANCES you should use this option in anything close to production. Please build your custom image instead and see if you continue having the problem.

mobuchowski commented 1 year ago

Hypothesis 1

Your on_start_listener is hanging on something -> not very likely that you already have some listener but since you were on 2.6, it's possible.

Enabling DEBUG logging might help verify this.

Taragolis commented 1 year ago

NEVER EVER UNDER ANY CIRCUMSTANCES you should use this option in anything close to production

... test, development. IMHO, this variable should only use if life or carrier depends on it 😥

I hope one day we will remove this env variable completely (Airflow 3?). According to my past experience with companies who already use Airflow chance that also _PIP_ADDITIONAL_REQUIREMENTS uses everywhere close to 100% (or only I so unlucky), with additional side effects: broken dependencies, health checks disabled or use 10 minutes timeout and etc.

And I can't see any better solution rather than disable (neutral) or terminate container (chaotic evil 👿 ) if found this variable, on the Internet a lot of suggestion to use it 😞 😢

pyiso8583 plyvel schema jsonschema redis==4.3.4

I've bet that Airflow 2.7 use redis >=5.2.3,<6

potiuk commented 1 year ago

Hypotestis 3.

So it might be that _PIP_ADDITIONAL_REQUIREMENTS cause pip to try to install conflicting requirements or takes a very long time to resolve them, and that is causing it to fail and crash scheduler. It's unlikely (then airflow would not even start I guess). If that's the case then.... I will singlehandedly remove that variable immediately and raise Exception in the image if someone uses it.

potiuk commented 1 year ago

(BTW. It's not A LOT @Taragolis - just 4 pages :D)

Taragolis commented 1 year ago

@potiuk and I've also got 131 results from SO: https://www.google.com/search?q=_PIP_ADDITIONAL_REQUIREMENTS+site:stackoverflow.com 🤣

I've just look couple of them hopefully not all of them suggest to use it, sometimes even opposite: "Please do not use it"

This one funny: https://stackoverflow.com/questions/76424725/airflow-docker-pip-additional-requirements-argument-why-there-is-a-comment-to

In additional google do not return results for some local-specific resources, but I definitely know some resources where suggested this solution

mpolatcan commented 1 year ago

Hi again @potiuk, I didn't mention anything about pickle library @jaetma mentioned it ☺️ And also, our environment runs custom packaged Docker image that basically install main dependencies and apache-airflow[all]==2.7.0 package with constraints file. We packaged Docker image with python:3.9 official Python image. Our DAG basically spawns Kubernetes pod that executes Python code that generates Excel file and sends it via e-mail. Our very simple DAG that is used for on-demand report triggering in our company. Interesting part is that when standalone DAG processor getting error -1 for this DAG but other complex DAGs can rendered very easy. But other parts also has problematic that memory and cpu usage increased intermediate worker pods of Kubernetes executor dramatically. So, I could'nt understood really well, I upgraded from 2.3.2 to 2.7.0 directly. But after I saw @jaetma downgraded to 2.6.3 and everything works fine, I downgraded too and everything works perfectly. Also, we are using EKS and Karpenter combination in our production environment, I think there is no problem related with compute environment resources ☺️


from airflow.models import DAG
from datetime import datetime, timedelta

from utils import AirflowCallbacks
from operators import GetirKubernetesCronJobOperator

default_args = {
    "owner": "Getir",
    "catchup": False,
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(seconds=15),
    "start_date": datetime(year=2019, month=7, day=10)
}

dag = DAG(
    dag_id="client_list",
    default_args=default_args,
    schedule_interval=None,
    tags=["client-list", "reports"]
)

GetirKubernetesCronJobOperator(
    name="cl-",
    task_id="clientlist",
    namespace="airflow",
    service_account_name="airflow-sa-role-sa",
    image="164762854291.dkr.ecr.eu-west-1.amazonaws.com/getir-python:3.7-buster",
    image_pull_policy="IfNotPresent",
    repo_name="client-list-cron",
    exec_env="eks",
    eks_nodegroup="ng-airflow-reports",
    eks_cpu_limit="500m",
    eks_memory_limit="16000Mi",
    cmds=[
        "bash", "-c",
        (
            "mkdir -p /usr/src/app && cp -r /repo/* /usr/src/app && "
            "cd /usr/src/app && pip install -r requirements.txt -q && "
            "python -u main.py {{ dag_run.conf.get('parameters') }}"
        )
    ],
    labels={"report": "yes"},
    annotations={
        "report_type": "panel",
        "report_name": "client-list",
        "exec_env": "{{dag_run.conf.get('exec_env', 'fargate')}}",
    },
    startup_timeout_seconds=600,
    is_delete_operator_pod=False,
    reattach_on_restart=False,
    pool="client_list_pool",
    on_failure_callback=AirflowCallbacks.client_list_failed_callback,
    dag=dag
)
Taragolis commented 1 year ago

@mpolatcan Just wondering is any chance that you use some DB performance analyzer in top of your RDS instance? PGBager, Amazon RDS Performance Insights and etc. Maybe it could show some anomaly activity?

potiuk commented 1 year ago

@jaetma @mpolatcan -> is it possible that you re-run it with debug logging turned on as @mobuchowski suggested? That could help us to quickly narrow down at least the area where we should look for the root cause (also possibly with replacing the _PIP_ADDITIONAL_REQUIREMENTS with a custom image - to verify that it is not triggering the issue).

Would be really helpful to see if we can /need to do something before 2.7.1 - we are just about to release 2.7.1 and at least getting some hypotheses of what could be wrong and knowing how much "environmental" vs. "wide-spread" could help us to determine what we can do about it.

Github-dm-CDE commented 1 year ago

I join in. We have a DAG that took an average of 2.5 minutes for each run before the Airflow 2.7.0 upgrade. After the update to Airflow 2.7.0, the running time of the individual DAG runs now averaged 24 minutes. Airflow even started skipping individual DAG runs because the DAG is scheduled to run every 20 minutes. 756433465436354nt

In the individual tasks of the DAG itself, all that is actually called is a CLI, which then queries the status of a table and writes it to an MSSQL.

Since the update to Airflow 2.7.0, it looks to us as if we have considerably more traffic on all our Airflow hosts via the loopback interface. 345345nt

potiuk commented 1 year ago

I join in. We have a DAG that took an average of 2.5 minutes for each run before the Airflow 2.7.0 upgrade. After the update to Airflow 2.7.0, the running time of the individual DAG runs now averaged 24 minutes. Airflow even started skipping individual DAG runs because the DAG is scheduled to run every 20 minutes. 756433465436354nt

In the individual tasks of the DAG itself, all that is actually called is a CLI, which then queries the status of a table and writes it to an MSSQL.

Since the update to Airflow 2.7.0, it looks to us as if we have considerably more traffic on all our Airflow hosts via the loopback interface. 345345nt

Any chance to see the logs with debug turned on for tasks as asked above @Github-dm-CDE ? And a bit more info on the deployment of yours ?

Github-dm-CDE commented 1 year ago

Any chance to see the logs with debug turned on for tasks as asked above @Github-dm-CDE ? And a bit more info on the deployment of yours ?

Regarding the logs, I first have to clarify this with my team and would possibly report back tomorrow.

Regarding the deployment: We updatet from Airflow 2.6.3 to Airflow 2.7.0. We did not change the Python version. Currently our .pex files are bundled with Python 3.9. Airflow runs on 3 RHEL8 VM hosts (dev, rls, prod). The host from the screenshot runs on a 10Core CPU with 60GB RAM. Airflow itself is deployed as a .pex file via Ansible and runs in combination with a local Postgres DB12. Airflow DB Migration, all the necessary Dag Configs and other code, etc. is also done and deployed via Ansible. Ansible loads the files from our Git repos onto the hosts and links them in the {{airflow_home}}/active directory. Other CLIs that we use from Airflow are also deployed as .pex files via Ansible.

raphaelsimeon commented 1 year ago

Hello there ! Following this Slack discussion, I'm adding a few logs for a similar issue :

Executed within a second

After upgrade:

[2023-08-29, 16:12:21 CEST] {{standard_task_runner.py:85}} INFO - Job 985071: Subtask upload_cards_file_to_s3
[2023-08-29, 16:12:28 CEST] {{task_command.py:415}} INFO - Running <TaskInstance: get_chargebee_exports.upload_cards_file_to_s3 scheduled__2023-08-29T13:10:00+00:00 [running]> on host ip-10-0-0-157.eu-west-1.compute.internal
[2023-08-29, 16:12:34 CEST] {{taskinstance.py:1660}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='get_chargebee_exports' AIRFLOW_CTX_TASK_ID='upload_cards_file_to_s3' AIRFLOW_CTX_EXECUTION_DATE='2023-08-29T13:10:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-08-29T13:10:00+00:00'

Taking 13 seconds The rest of the task takes the same amount of time to execute

A few things I noticed :

(Can edit my post with more details if necessary, will try to rerun it at debug log level)

Taragolis commented 1 year ago

Metadata db runs on RDS for prod and preprod with same instance type,

@raphaelsimeon Any chance that you have enabled Performance Insights in your prod/preprod RDS instances?

Github-dm-CDE commented 1 year ago

Any chance to see the logs with debug turned on for tasks as asked above @Github-dm-CDE ? And a bit more info on the deployment of yours ?

Hello @potiuk , here is the code of the DAG including the logs before and after the update to Airflow 2.7.0. Currently the debug logging is still deactivated, but I will have a look at that in a moment. Is there a good way to enable debug logging only for this one specific DAG?

DAG Config

import itertools
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterator

import etl_env as env
from airflow import DAG
from airflow.operators.latest_only import LatestOnlyOperator
from airflow_extensions.macros.teams_plugin import on_failure_message_to_teams
from airflow_extensions.operators.cli_task_builder.cli_task_builder import (
    CLIOperator,
    CliTaskBuilder,
    ConnectionBuilder,
)
from bda_prj_self_service_cas_upload.table_config import config as self_service
from etl_env import PEX_BASE_DIR

default_args = {
    "owner": "CK",
    "depends_on_past": False,
    "start_date": datetime(2021, 6, 1, 7, 0, 0),
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

PEX_PATH = os.path.join(PEX_BASE_DIR, "viyacli.pex")
CASLIB_COUNTRIES = [country.upper() for country in env.COUNTRIES_XX]

def get_self_service_cas_upload_configs() -> Iterator[Path]:
    table_config_path = os.path.dirname(self_service.__file__)
    return Path(table_config_path).glob(
        f"{self_service.SELF_SERVICE_CONFIG_PREFIX}*.yaml"
    )

def get_caslib_teams() -> list:
    caslib_teams = ["CK"]
    for config in get_self_service_cas_upload_configs():
        team_name = self_service.get_team_name_from_config_filename(config.name)
        caslib_teams.append(team_name.upper())
    return list(set(caslib_teams))

def get_all_caslibs() -> Iterator:
    yield "PUBLIC"
    caslib_teams = get_caslib_teams()
    for country, team in itertools.product(CASLIB_COUNTRIES, caslib_teams):
        yield f"{country}_{team}"

def create_cas_table_monitoring_task(dag: DAG, caslib: str) -> CLIOperator:
    return (
        CliTaskBuilder()
        .with_task_args(
            task_id=f"collect_CAS_table_metrics_{caslib.lower()}",
            dag=dag,
        )
        .with_command(f"{PEX_PATH} cas-monitoring")
        .with_connection(
            ConnectionBuilder("sas_viya", env_prefix="VIYA")
            .with_login("USERNAME")
            .with_password("PASSWORD")
            .with_host("HOST_URL")
        )
        .with_connection(
            ConnectionBuilder("mssql_db", env_prefix="MSSQL")
            .with_login("USERNAME")
            .with_password("PASSWORD")
            .with_host("HOST_URL")
            .with_port("HOST_PORT")
            .with_schema("DATABASE")
        )
        .with_cli_arg(f"--ssl_cert_file {SSL_CERT_PATH}")
        .with_cli_arg(f"--viya_caslib {CASLIB}")
        .with_cli_arg(f"--viya_client_secret {CLIENT_SECRET}")
        .with_cli_arg("--monitoring_table sas_cas_table_status")
        .create()
    )

def create_dag():
    return DAG(
        "prj_ck_cas_table_monitoring",
        description=__doc__,
        default_args=default_args,
        schedule="*/20 * * * *",
        max_active_tasks=3,
        max_active_runs=1,
        on_failure_callback=on_failure_message_to_teams,
    )

dag = create_dag()
latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)
for caslib in get_all_caslibs():
    cas_table_metrics_tasks = create_cas_table_monitoring_task(dag, caslib)
    latest_only >> cas_table_metrics_tasks

and here are the logs for one collect_CAS_tablemetrics* task: Airflow 2.6.3:

*** Found local files:
***   * /var/log/airflow/dags/dag_id=prj_ck_cas_table_monitoring/run_id=scheduled__2023-08-28T09:20:00+00:00/task_id=collect_CAS_table_metrics_public/attempt=1.log
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:20:00+00:00 [queued]>
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:20:00+00:00 [queued]>
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1308} INFO - Starting attempt 1 of 3
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1327} INFO - Executing <Task(CLIOperator): collect_CAS_table_metrics_public> on 2023-08-28 09:20:00+00:00
[2023-08-28, 11:40:02 CEST] {standard_task_runner.py:57} INFO - Started process 3717615 to run task
[2023-08-28, 11:40:02 CEST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'prj_ck_cas_table_monitoring', 'collect_CAS_table_metrics_public', 'scheduled__2023-08-28T09:20:00+00:00', '--job-id', '8827796', '--raw', '--subdir', 'DAGS_FOLDER/prj_ck_cas_table_monitoring/prj_ck_cas_table_monitoring.py', '--cfg-path', '/tmp/tmp73qdvuk2']
[2023-08-28, 11:40:02 CEST] {standard_task_runner.py:85} INFO - Job 8827796: Subtask collect_CAS_table_metrics_public
[2023-08-28, 11:40:02 CEST] {task_command.py:410} INFO - Running <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:20:00+00:00 [running]> on host OBSCURED_HOSTNAME
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-28T09:20:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-08-28T09:20:00+00:00'
[2023-08-28, 11:40:02 CEST] {logging_mixin.py:150} INFO - Execute cli task
[2023-08-28, 11:40:02 CEST] {base.py:73} INFO - Using connection ID 'sas_viya_conn' for task execution.
[2023-08-28, 11:40:02 CEST] {base.py:73} INFO - Using connection ID 'cxa_mssql_db_domain_user_format' for task execution.
[2023-08-28, 11:40:02 CEST] {logging_mixin.py:150} INFO - Environment variables: ['LANG', 'PATH', 'VIYA_USERNAME', 'VIYA_PASSWORD', 'VIYA_HOST_URL', 'MSSQL_USERNAME', 'MSSQL_PASSWORD', 'MSSQL_HOST_URL', 'MSSQL_HOST_PORT', 'MSSQL_DATABASE']
[2023-08-28, 11:40:02 CEST] {logging_mixin.py:150} INFO - Bash command: /srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status
[2023-08-28, 11:40:02 CEST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2023-08-28, 11:40:02 CEST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', '/srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status']
[2023-08-28, 11:40:02 CEST] {subprocess.py:86} INFO - Output:
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "name": "OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceTableName": "OBSCURED_USER_MAPPINGTEST.sashdat",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T09:40:45.761Z",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "size": 8448,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T10:40:46+01:00",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "name": "MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceTableName": "MAP_DEVICE_TYPE_FIXED.sashdat",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T14:18:52.223Z",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "size": 8544,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T15:18:52+01:00",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:45 - The following tables have been skipped: ['OBSCURED_USER_MAPPINGTEST', 'MAP_DEVICE_TYPE_FIXED']
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.mssql_utils:write_to_mssql:35 - No data was passed to insert into the Database.
[2023-08-28, 11:40:06 CEST] {subprocess.py:97} INFO - Command exited with return code 0
[2023-08-28, 11:40:06 CEST] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=prj_ck_cas_table_monitoring, task_id=collect_CAS_table_metrics_public, execution_date=20230828T092000, start_date=20230828T094002, end_date=20230828T094006
[2023-08-28, 11:40:06 CEST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-28, 11:40:06 CEST] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

Airflow 2.7.0:

OBSCURED_VIYA_CLIENT_SECRET Found local files:
OBSCURED_VIYA_CLIENT_SECRET   * /var/log/airflow/dags/dag_id=prj_ck_cas_table_monitoring/run_id=scheduled__2023-08-28T09:40:00+00:00/task_id=collect_CAS_table_metrics_public/attempt=1.log
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:40:00+00:00 [queued]>
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:40:00+00:00 [queued]>
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1361} INFO - Starting attempt 1 of 3
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1382} INFO - Executing <Task(CLIOperator): collect_CAS_table_metrics_public> on 2023-08-28 09:40:00+00:00
[2023-08-28, 12:16:16 CEST] {standard_task_runner.py:57} INFO - Started process 3763639 to run task
[2023-08-28, 12:16:16 CEST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'prj_ck_cas_table_monitoring', 'collect_CAS_table_metrics_public', 'scheduled__2023-08-28T09:40:00+00:00', '--job-id', '8827969', '--raw', '--subdir', 'DAGS_FOLDER/prj_ck_cas_table_monitoring/prj_ck_cas_table_monitoring.py', '--cfg-path', '/tmp/tmpvgdhcef6']
[2023-08-28, 12:16:16 CEST] {standard_task_runner.py:85} INFO - Job 8827969: Subtask collect_CAS_table_metrics_public
[2023-08-28, 12:16:24 CEST] {task_command.py:415} INFO - Running <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:40:00+00:00 [running]> on host OBSCURED_HOSTNAME
[2023-08-28, 12:16:32 CEST] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-28T09:40:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-08-28T09:40:00+00:00'
[2023-08-28, 12:16:32 CEST] {logging_mixin.py:151} INFO - Execute cli task
[2023-08-28, 12:16:32 CEST] {base.py:73} INFO - Using connection ID 'sas_viya_conn' for task execution.
[2023-08-28, 12:16:32 CEST] {base.py:73} INFO - Using connection ID 'cxa_mssql_db_domain_user_format' for task execution.
[2023-08-28, 12:16:32 CEST] {logging_mixin.py:151} INFO - Environment variables: ['LANG', 'PATH', 'VIYA_USERNAME', 'VIYA_PASSWORD', 'VIYA_HOST_URL', 'MSSQL_USERNAME', 'MSSQL_PASSWORD', 'MSSQL_HOST_URL', 'MSSQL_HOST_PORT', 'MSSQL_DATABASE']
[2023-08-28, 12:16:32 CEST] {logging_mixin.py:151} INFO - Bash command: /srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status
[2023-08-28, 12:16:32 CEST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2023-08-28, 12:16:32 CEST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', '/srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status']
[2023-08-28, 12:16:32 CEST] {subprocess.py:86} INFO - Output:
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.935 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "name": "OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceTableName": "OBSCURED_USER_MAPPINGTEST.sashdat",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T09:40:45.761Z",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "size": 8448,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T10:40:46+01:00",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "name": "MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceTableName": "MAP_DEVICE_TYPE_FIXED.sashdat",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T14:18:52.223Z",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "size": 8544,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T15:18:52+01:00",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:45 - The following tables have been skipped: ['OBSCURED_USER_MAPPINGTEST', 'MAP_DEVICE_TYPE_FIXED']
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.mssql_utils:write_to_mssql:35 - No data was passed to insert into the Database.
[2023-08-28, 12:16:36 CEST] {subprocess.py:97} INFO - Command exited with return code 0
[2023-08-28, 12:16:36 CEST] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=prj_ck_cas_table_monitoring, task_id=collect_CAS_table_metrics_public, execution_date=20230828T094000, start_date=20230828T101616, end_date=20230828T101636
[2023-08-28, 12:16:36 CEST] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-28, 12:16:36 CEST] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check

This is one of the faster running tasks in the DAG. But as you can clearly see, the running times have quadrupled from ~4 seconds to ~20 seconds. Most of the time, with Airflow 2.7.0, is spent in the period before it starts the actual CLI task. The time between "Started process" and "Execute cli task" is almost instantaneous with Airflow 2.6.3. With Airflow 2.7.0, 16 seconds pass here alone.

raphaelsimeon commented 1 year ago

@Taragolis yes, here is the performance activity for the last 5 hours. The peak at 3 AM we see represents also a peak in activity in terms of DAGs running. Anything particular you want to check ? image

ephraimbuddy commented 1 year ago

Is there a good way to enable debug logging only for this one specific DAG?

@Github-dm-CDE, we would like to also see the scheduler logs if that's possible so enabling logging for only a DAG won't be a solution and no, there's no straightforward way of enabling debug logging only for a specific DAG

Taragolis commented 1 year ago

here is the performance activity for the last 5 hours. The peak at 3 AM we see represents also a peak in activity in terms of DAGs running. Anything particular you want to check ?

According to Average Active Sessions (AAS) more time consuming operations is locks (transactionid and tuple in the legend), I'm not sure that there is something serious on DB backend, even if all this metrics are average for the periods, and actual spike could be missed on it. You could click on legend on transactionid and this would keep only top 10 queries which impact this event on selected period, I guess it would be SELECT ... FROM dag_run WHERE dag_run.dag_id = '{DAG_ID}' AND dag_run.run_id = '{RUN_ID}' FOR UPDATE;


In additional I locally run simple dag on Airflow 2.6.3 and Airflow 2.7.0

[!IMPORTANT]
I run my "performance comparison" on local machine so latency to DB with almost zero, executor was Local and DAG/Task pretty simple and nothing other running at that moment. So result might be far-far-far away of actual problem

from airflow.decorators import task
from airflow import DAG
import pendulum

with DAG(
    dag_id=f"performance-check",
    start_date=pendulum.datetime(2021, 1, 1, tz='UTC'),
    end_date=None,
    schedule="@daily",
    catchup=True,
    tags=["performance"],
    max_active_runs=64,
) as dag:
    @task
    def sample_task(test_data=None, ti=None):
        print(f"{ti.dag_id}-{ti.run_id}-{ti.task_id}[{ti.map_index}]: {test_data}")

    sample_task()

Also I've turn on most of the logging on postgres, clean postgres log file before turn on DAG and after all 970 dag runs completed use PGbadger on postgres log: pgbadger-report-airflow-2-results.zip

The main differences was in obtain information about previous dag run

Airflow 2.7.0 First with total cumulative execution time 24s820ms for 9700 queries image

Airflow 2.6.3, Second (lets ignore COMMIT) with total cumulative execution time 6s525ms for 9700 queries image

This behaviour fixed and should be part of 2.7.1, in some circumstances it could be a reason of performance degradation, if DB backed far away of Airflow (latency high), quite a few previous DAG runs exists. In additional it also could be a reason why RAM/CPU usage increased. However in deployment which far away of prod usage this was impact just for about additional 100ms-1s

raphaelsimeon commented 1 year ago

@Taragolis indeed, it does not look like the Db is the problem here. here is top SQL in terms of latency in last 24hrs and performance metrics (there is one peak of latency at 8:45UTC, but not during other "problematic hours") image

image

Github-dm-CDE commented 1 year ago

Hi @potiuk & @ephraimbuddy, here are the task logs wit AF 2.7.0 again (but this time from our RLS stage, which has the same problem), with logging_level=DEBUG.

OBSCURED_CLIENT_SECRET Found local files:
OBSCURED_CLIENT_SECRET   * /var/log/airflow/dags/dag_id=prj_ck_cas_table_monitoring/run_id=manual__2023-08-30T14:22:51.684136+00:00/task_id=collect_CAS_table_metrics_public/attempt=1.log
[2023-08-30, 14:23:49 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:23:54 UTC] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner
[2023-08-30, 14:23:58 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:24:02 UTC] {base_task_runner.py:68} DEBUG - Planning to run as the  user
[2023-08-30, 14:24:02 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> from DB
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]>
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, There are enough open slots in default_pool to execute the task
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]>
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 3
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1382} INFO - Executing <Task(CLIOperator): collect_CAS_table_metrics_public> on 2023-08-30 14:22:51.684136+00:00
[2023-08-30, 14:24:02 UTC] {standard_task_runner.py:57} INFO - Started process 2355015 to run task
[2023-08-30, 14:24:02 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'prj_ck_cas_table_monitoring', 'collect_CAS_table_metrics_public', 'manual__2023-08-30T14:22:51.684136+00:00', '--job-id', '2428156', '--raw', '--subdir', 'DAGS_FOLDER/prj_ck_cas_table_monitoring/prj_ck_cas_table_monitoring.py', '--cfg-path', '/tmp/tmpo_h27u9p']
[2023-08-30, 14:24:02 UTC] {standard_task_runner.py:85} INFO - Job 2428156: Subtask collect_CAS_table_metrics_public
[2023-08-30, 14:24:02 UTC] {cli_action_loggers.py:67} DEBUG - Calling callbacks: [<function default_action_log at 0x7f880eea13a0>]
[2023-08-30, 14:24:07 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:24:08 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:08 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:11 UTC] {task_command.py:415} INFO - Running <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> on host OBSCURED_HOSTNAME
[2023-08-30, 14:24:11 UTC] {settings.py:353} DEBUG - Disposing DB connection pool (PID 2355015)
[2023-08-30, 14:24:11 UTC] {settings.py:212} DEBUG - Setting up DB connection pool (PID 2355015)
[2023-08-30, 14:24:11 UTC] {settings.py:285} DEBUG - settings.prepare_engine_args(): Using NullPool
[2023-08-30, 14:24:11 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:13 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:13 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:16 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:24:18 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:18 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:20 UTC] {taskinstance.py:925} DEBUG - Clearing XCom data
[2023-08-30, 14:24:20 UTC] {retries.py:92} DEBUG - Running RenderedTaskInstanceFields.write with retries. Try 1 of 3
[2023-08-30, 14:24:20 UTC] {retries.py:92} DEBUG - Running RenderedTaskInstanceFields._do_delete_old_records with retries. Try 1 of 3
[2023-08-30, 14:24:20 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-30T14:22:51.684136+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-30T14:22:51.684136+00:00'
[2023-08-30, 14:24:20 UTC] {__init__.py:117} DEBUG - Preparing lineage inlets and outlets
[2023-08-30, 14:24:20 UTC] {__init__.py:158} DEBUG - inlets: [], outlets: []
[2023-08-30, 14:24:20 UTC] {logging_mixin.py:151} INFO - Execute cli task
[2023-08-30, 14:24:20 UTC] {base.py:73} INFO - Using connection ID 'sas_viya_conn' for task execution.
[2023-08-30, 14:24:20 UTC] {base.py:73} INFO - Using connection ID 'cxa_mssql_db_domain_user_format' for task execution.
[2023-08-30, 14:24:20 UTC] {logging_mixin.py:151} INFO - Environment variables: ['LANG', 'PATH', 'VIYA_USERNAME', 'VIYA_PASSWORD', 'VIYA_HOST_URL', 'MSSQL_USERNAME', 'MSSQL_PASSWORD', 'MSSQL_HOST_URL', 'MSSQL_HOST_PORT', 'MSSQL_DATABASE']
[2023-08-30, 14:24:20 UTC] {logging_mixin.py:151} INFO - Bash command: /srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_CLIENT_SECRET --monitoring_table sas_cas_table_status
[2023-08-30, 14:24:20 UTC] {bash.py:186} DEBUG - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-30T14:22:51.684136+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-30T14:22:51.684136+00:00'
[2023-08-30, 14:24:20 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2023-08-30, 14:24:20 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', '/srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_CLIENT_SECRET --monitoring_table sas_cas_table_status']
[2023-08-30, 14:24:20 UTC] {subprocess.py:86} INFO - Output:
[2023-08-30, 14:24:23 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:23 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:23 UTC] {subprocess.py:93} INFO - 2023-08-30 16:24:23.723 | WARNING  | viya.mssql_utils:write_to_mssql:35 - No data was passed to insert into the Database.
[2023-08-30, 14:24:23 UTC] {subprocess.py:97} INFO - Command exited with return code 0
[2023-08-30, 14:24:23 UTC] {__init__.py:75} DEBUG - Lineage called with inlets: [], outlets: []
[2023-08-30, 14:24:23 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:23 UTC] {taskinstance.py:1458} DEBUG - Clearing next_method and next_kwargs.
[2023-08-30, 14:24:23 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=prj_ck_cas_table_monitoring, task_id=collect_CAS_table_metrics_public, execution_date=20230830T142251, start_date=20230830T142402, end_date=20230830T142423
[2023-08-30, 14:24:23 UTC] {taskinstance.py:2436} DEBUG - Task Duration set to 20.913392
[2023-08-30, 14:24:23 UTC] {cli_action_loggers.py:85} DEBUG - Calling callbacks: []
[2023-08-30, 14:24:23 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-30, 14:24:23 UTC] {dagrun.py:740} DEBUG - number of tis tasks for <DagRun prj_ck_cas_table_monitoring @ 2023-08-30 14:22:51.684136+00:00: manual__2023-08-30T14:22:51.684136+00:00, state:running, queued_at: 2023-08-30 16:22:51.722581+00:00. externally triggered: True>: 0 task(s)
[2023-08-30, 14:24:23 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check
nick-msk-ai commented 1 year ago

Like other users above, when upgrading I also noticed major increases to time taken to execute my dag_run that was previously taking < 1 minute to execute (scheduled every minute). It led to a cascade of unfinished dag_runs across this dag, and subsequently all my other dags were affected.

Like @raphaelsimeon, the performance insights from my RDS instance showed that the query to get the DAG history was taking a long time to return. I also noticed that each running dag_run triggers a query to get this data (all entries before the current dag_run), and also re-requests this query at multiple points while the DAG is running. This seemed to be the root of the issue as even when the first query does not return in a timely manner, a subsequent one is automatically triggered by the scheduler and so on, until there is a log-jam of pending queries (until the max_active_runs is reached) which affects database performance for every other process and DAG.

For context I run airflow db clean as part of a daily DAG to ensure that only the last 21 days of metadata is kept. So for my DAG on a 1 minute interval, I had ~28000 rows in the dag_run table. For another of my DAGs that runs every 10 mins, there are consistently ~ 2900 rows.

Experiment 1: If I turned off the DAG on the 1 minute interval (also setting all dag runs of that dag to a non-running state) then the rest of my DAGs would execute well within my expected intervals.

Experiment 2: If I turned on the DAG with 1 minute interval having deleted all relevant entries from the dag_run table, the DAG executes within the interval.

So I have found a work-around for my particular case by deleting rows from the dag_run metadata table. I would be interested to know what kind of state the dag_run tables are for the users who have posted above, and whether they are performing any scheduled maintenance on those tables?

phanikumv commented 1 year ago

@nick-msk-ai you mentioned that the query to get the DAG history was taking a long time to return. Can you please paste the exact query that you mentioned here, so that we can see if there were any recent changes to that query

nick-msk-ai commented 1 year ago
SELECT dag_run.state, dag_run.id, dag_run.dag_id, dag_run.queued_at, dag_run.execution_date, dag_run.start_date, dag_run.end_date, dag_run.run_id, dag_run.creating_job_id, dag_run.external_trigger, dag_run.run_type, dag_run.conf, dag_run.data_interval_start, dag_run.data_interval_end, dag_run.last_scheduling_decision, dag_run.dag_hash, dag_run.log_template_id, dag_run.updated_at
FROM dag_run                                                                                                                                                                                                                                                                                                                                                                                 
WHERE dag_run.dag_id = 'patient_lists' AND dag_run.execution_date < '2023-08-31T12:40:00+00:00'::timestamptz AND dag_run.state = 'success' ORDER BY dag_run.execution_date DESC                                                                                                                                                                                                              

For a 10 minute interval DAG run scheduled at 2023-08-31, 12:50:00 UTC

@phanikumv

I'm not 100% but I think the query itself must be being dynamically generated in airflow/api_connexion/endpoints/dag_run_endpoint.py.

raphaelsimeon commented 1 year ago

@nick-msk-ai actually, performance insights in RDS don't seem to show very high latency as you suggested. Slowest query looks like it has 100ms latency on average, which does not sound overwhelming, does it ?

Taragolis commented 1 year ago
SELECT dag_run.state, dag_run.id, dag_run.dag_id, dag_run.queued_at, dag_run.execution_date, dag_run.start_date, dag_run.end_date, dag_run.run_id, dag_run.creating_job_id, dag_run.external_trigger, dag_run.run_type, dag_run.conf, dag_run.data_interval_start, dag_run.data_interval_end, dag_run.last_scheduling_decision, dag_run.dag_hash, dag_run.log_template_id, dag_run.updated_at
FROM dag_run                                                                                                                                                                                                                                                                                                                                                                                 
WHERE dag_run.dag_id = 'patient_lists' AND dag_run.execution_date < '2023-08-31T12:40:00+00:00'::timestamptz AND dag_run.state = 'success' ORDER BY dag_run.execution_date DESC                                                                                                                                                                                                              

This query fixed in https://github.com/apache/airflow/pull/33672

Taragolis commented 1 year ago

@jaetma @raphaelsimeon @nick-msk-ai @Github-dm-CDE could you check if the task's execution become more reliable and fast in Airflow 2.7.1 if compare with 2.7.0

LuisLarisch commented 1 year ago

@Taragolis, Sorry for intruding, since no one is answering im giving my feedback. I was keeping up with this issue since we were facing similar problems after the 2.7.0 upgrade. We upgraded to 2.7.1 this weekend, and the performance issue was solved. For context: 1) 2.6.3: Specific task took 2.5 hrs to finish 2) 2.7.0: Same task took 6 hrs to finish. (It got to a point where a few task in the DAG stopped working.) 3) 2.7.1: Back to 2.5 hrs.

potiuk commented 1 year ago

Can others here confirm it? I am going to close that one as it seems that the fix is working, buth having others confirming it would be great.

CM000n commented 1 year ago

I can confirm that. The update from 2.7.0 to 2.7.1 seems to have solved the problem.

jaetma commented 1 year ago

I can confirm my kubernetes instance with 2.7.1 does not have that problem anymore, upgraded from 2.6.3

potiuk commented 1 year ago

Thanks everyone. Great job @Taragolis on getting to the bottom of it and fixing it :). That was pretty remote from what I would expect could be the reason ;)

raphaelsimeon commented 1 year ago

Late to the party here, but same for us, upgrade to 2.7.1 did the trick. Thanks all for having a look :)

BenoCharlo commented 9 months ago

Hello everyone, just got to this thread because I'm facing same issue on AF 2.7.2. I'm using this version because this is the latest version available on AWS MWAA. I'm experincing a huge amount of time on simple tasks that used to take seconds to run. Has anyone facing same delay issue on AWS airflow 2.7.2?

pankajkoti commented 9 months ago

@BenoCharlo Do you have a local Airflow env setup? If yes, could you try running your DAGs locally with Airflow 2.7.2 vs your previous version and see if you are able to reproduce it locally? Would be nice to check first that it is not an infra issue on your cloud deployment.