apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

The scheduler does not appear to be running. Last heartbeat was received X minutes ago. #19192

Closed t4n1o closed 2 years ago

t4n1o commented 2 years ago

Apache Airflow version

2.1.4

Operating System

Linux / Ubuntu Server

Versions of Apache Airflow Providers

apache-airflow-providers-ftp==2.0.1 apache-airflow-providers-http==2.0.1 apache-airflow-providers-imap==2.0.1 apache-airflow-providers-postgres==2.3.0

Deployment

Virtualenv installation

Deployment details

Airflow v2.1.4 Postgres 14 LocalExecutor Installed with Virtualenv / ansible - https://github.com/idealista/airflow-role

What happened

image

I run a single BashOperator (for a long running task, we have to download data for 8+ hours initially to download from the rate-limited data source API, then download more each day in small increments).

We're only using 3% CPU and 2 GB of memory (out of 64 GB) but the scheduler is unable to run any other simple task at the same time.

Currently only the long task is running, everything else is queued, even thought we have more resources: image

What you expected to happen

I expect my long running BashOperator task to run, but for airflow to have the resources to run other tasks without getting blocked like this.

How to reproduce

I run a command with bashoperator (I use it because I have python, C, and rust programs being scheduled by airflow). bash_command='umask 002 && cd /opt/my_code/ && /opt/my_code/venv/bin/python -m path.to.my.python.namespace'

Configuration:

airflow_executor: LocalExecutor
airflow_database_conn: 'postgresql+psycopg2://airflow:airflow_pass@localhost:5432/airflow'
airflow_database_engine_encoding: utf-8
airflow_database_engine_collation_for_ids:
airflow_database_pool_enabled: True
airflow_database_pool_size: 3
airflow_database_max_overflow: 10
airflow_database_pool_recycle: 2000
airflow_database_pool_pre_ping: True
airflow_database_schema:
airflow_database_connect_args:
airflow_parallelism: 10
airflow_dag_concurrency: 7
airflow_dags_are_paused_at_creation: True
airflow_max_active_runs_per_dag: 16
airflow_load_examples: False
airflow_load_default_connections: False
airflow_plugins_folder: "{{ airflow_app_home }}/plugins"

# [operators]
airflow_operator_default_owner: airflow
airflow_operator_default_cpus: 1
airflow_operator_default_ram: 512
airflow_operator_default_disk: 512
airflow_operator_default_gpus: 0
airflow_default_queue: default
airflow_allow_illegal_arguments: False

Anything else

This occurs every time consistently, also on 2.1.2

The other tasks have this state: image

When the long-running task finishes, the other tasks resume normally. But I expect to be able to do some parallel execution /w LocalExecutor.

I haven't tried using pgbouncer.

Are you willing to submit PR?

Code of Conduct

potiuk commented 2 years ago

It looks like you are actually using SequentialExecutor. That would perfectly explain the behaviour. Are you sure you are using local executor and scheduler is running ? Can you run airflow info and verify it / paste the results here?

t4n1o commented 2 years ago

LocalExecutor: image

In this screenshot the scheduler is running 4 of the same process / task, because max_active_runs was not set (I subsequently set it to 1, because that's the behaviour I want).

As stated above, the issue is airflow will not run other dags and the scheduler is not responding. (Strangely, the scheduler is apparently also quite happy to run 1 task from 1 dag in 4 parallel processes.)

I suspect some value in the configuration or not enough database connections.

t4n1o commented 2 years ago

Apache Airflow
version                | 2.1.4                                                            
executor               | LocalExecutor                                                    
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler              
sql_alchemy_conn       | postgresql+psycopg2://airflow:airflow_pass@localhost:5432/airflow
dags_folder            | /opt/data_workflows/dags                     
plugins_folder         | /opt/airflow/plugins                                             
base_log_folder        | /var/log/airflow                                                 
remote_base_log_folder |                                                                  

System info
OS              | Linux                                                                                                                                                                  
architecture    | x86_64                                                                                                                                                                 
uname           | uname_result(system='Linux', node='Ubuntu-2004-focal-64-minimal', release='5.4.0-89-generic', version='#100-Ubuntu SMP Fri Sep 24 14:50:10 UTC 2021', machine='x86_64')
locale          | ('en_US', 'UTF-8')                                                                                                                                                     
python_version  | 3.9.5 (default, May 19 2021, 11:32:47)  [GCC 9.3.0]                                                                                                                    
python_location | /opt/airflow/bin/python                                                                                                                                                

Tools info
git             | git version 2.25.1                                                                         
ssh             | OpenSSH_8.2p1 Ubuntu-4ubuntu0.3, OpenSSL 1.1.1f  31 Mar 2020                               
kubectl         | NOT AVAILABLE                                                                              
gcloud          | NOT AVAILABLE                                                                              
cloud_sql_proxy | NOT AVAILABLE                                                                              
mysql           | NOT AVAILABLE                                                                              
sqlite3         | 3.31.1 2020-01-27 19:55:54 3bfa9cc97da10598521b342961df8f5f68c7388fa117345eeb516eaa837balt1
psql            | psql (PostgreSQL) 14.0 (Ubuntu 14.0-1.pgdg20.04+1)                                         

Paths info
airflow_home    | /opt/airflow                                                                                                                                                                                     
system_path     | /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin                                                                                               
python_path     | /opt/airflow:/usr/lib/python39.zip:/usr/lib/python3.9:/usr/lib/python3.9/lib-dynload:/opt/airflow/lib/python3.9/site-packages:/opt/data_workflows/dags:/opt/airflow/config:/o
                | pt/airflow/plugins                                                                                                                                                                               
airflow_on_path | False                                                                                                                                                                                            

Providers info
apache-airflow-providers-ftp      | 2.0.1
apache-airflow-providers-http     | 2.0.1
apache-airflow-providers-imap     | 2.0.1
apache-airflow-providers-postgres | 2.3.0
apache-airflow-providers-sqlite   | 2.0.1
potiuk commented 2 years ago

It does not look like like Airflow problem to be honest.

It certainly looks like your long running task blocks some resources that blocks scheduler somehow (but there is no indication how it can be blocked). There must be something in your DAGs or task that simply causes the Airflow scheduler to lock up.

This is almost certainly something specific to your deployment (others do not experience it). But what it is, it's hard to say from the information provided.

My best guess is that you have some lock on the database and it makes scheduler wait while running some query.

Is it possible to dump the state of scheduler and dump generaly more of the state of your machine, resouces, sockets, DB locks while it happens (this shoudl be possible with py-spy for example). Also getting all logs of scheduler and seing if it actually "does" something might help. Unfortunately the information we have now is not enough to deduce the reason.

Any insight of WHERE the schduler is locked might help with investigating it.

potiuk commented 2 years ago

BTW. Yeah, checking the limits of connections opened in your DB might be a good idea. Especially if you are using variables in your DAGs at top level, it MIGHT lead to a significant number of connections opened, which *might eventually cause scheduler to try to open a new connection and patiently wait until the DB server will have any connection free. It might simply be that your long running tasks are written in the way that they (accidentally) open a lot of those connections and do not close them until the task completes.

I think PGBouncer might help with that, but if too many connections are opened by a single long running task and they are not closed, that might also not help too much.

t4n1o commented 2 years ago

Well, there is nothing locking the db. This issue occurs even if there is no database being used by my application.

Types of tasks that cause this problem:

Any program written in rust or python that takes a long time to execute will cause this problem. We are using airflow because once we sync all the historical data, we run the task once per day each new day.

image

Here is a dump of what the scheduler is doing, while it's stuck.

image

State of the various airflow processes: image

I am starting the rust/python programs in a separate process with BashOperator, and it's stuck on _recv(). Since all the tasks are limited by a rate-limit of some API, disk speed, or network speed, it would be better if airflow could actually run more than 1 task at a time. Any ideas?

t4n1o commented 2 years ago

image

potiuk commented 2 years ago

I think you are misusing Airlfow. Airlfow by definition can run multiple tasks even with single local executor, so you are likely misunderstanding how airflow operators are run and run it as a top-level code, and not as task?

Can you please copy your DAG here?

You are not supposed to run long running operations when the DAG file is parsed - parsing should be run rather quickly and from what it looks like you execute a long running process while parsing happens rather than when tasks are executed:

https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code

t4n1o commented 2 years ago
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "t4n1o",
    "depends_on_past": False,
    "email": ["myemail@gmail.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=2),
    "max_active_runs_per_dag": 1,

}
with DAG(
    "Bitmex_Archives_Mirror",
    default_args=default_args,
    description="Mirror the archives from public.bitmex.com ",
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=["raw price history"],
    catchup=False,
) as dag:

    t1 = BashOperator(
        task_id="download_csv_gz_archives",
        bash_command="umask 002 && cd /opt/soft/ &&"
        "/opt/soft/venv/bin/python -m soft.data_workflows.bitmex.archives_mirror",
    )

    t2 = BashOperator(
        task_id="process_archives_into_daily_csv_files",
        depends_on_past=False,
        bash_command="umask 002 && cd /opt/executables/bitmex_csvgz_parser && ./parser",
        retries=3,
    )

    t1 >> t2
t4n1o commented 2 years ago

I'm not sure how airflow is intended to be used, but sometimes people find other use cases for a tool they haven't designed.

We run a task that can take a few hours to collect all the historical data and process it. And then we want the task to run once per day.

It appears, from my side, that the airflow server UI can't contact the scheduler while the long task is running, and other DAGs can't be run. Perhaps the scheduler wants my code to yield control back to it frequently (once per day of data, for example), but I prefer to let my own code manage the date ranges, because that's where the unit tests are, and all the heavy lifting is in rust anyway.

t4n1o commented 2 years ago

What I do find use for in airflow

potiuk commented 2 years ago

I'm not sure how airflow is intended to be used, but sometimes people find other use cases for a tool they haven't designed.

We run a task that can take a few hours to collect all the historical data and process it. And then we want the task to run once per day.

This is what airflow is designed for. I tihnk you just use it wrongly (or misconfigured it). It is supposed to handle that case perfectly (and it works this way for thousands of users. So it's your configuration/setup/way of using it is wrong.

It appears, from my side, that the airflow server UI can't contact the scheduler while the long task is running, and other DAGs can't be run. Perhaps the scheduler wants my code to yield control back to it frequently (once per day of data, for example), but I prefer to let my own code manage the date ranges, because that's where the unit tests are, and all the heavy lifting is in rust anyway.

No. This is no the case (unless you use Sequential Executor which is only suposed to be used for debugging). . Airflow is designed to run multiple paralllel tass at a time:. You likely have some problem in your airlfow installation/configuration

Questions:

1) Do you actually have scheduler running at all? Does it have coninuous access to the DB?

2) Are you absolutely sure you are not using SequentialExecutior ? What does your airflow info say - can you paste-bin output of it ? (airlfow has built-in flag to send to pastebin). Please make sure also that you do it in exactly the way your scheduler works. Miost likely you run your scheduler with a different configuration than your webserver and that causes the problem.

3) Are you sure you are using Postgres and not Sqlite? What does your airflow info say?

4) Where is your Python code (non-DAG)? Did you .airflowignore non-DAG files from airflow's DAG folder?

5) can you upgrade to Airlfow 2.2.3 (latest released) - it has built-in warnings in case you use Sequential Executor/SQLite in the UI.

6) Can you change your DAGs to:

default_args = {
    "owner": "t4n1o",
    "depends_on_past": False,
    "email": ["myemail@gmail.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=2),
    "max_active_runs_per_dag": 1,

}
with DAG(
    "Bitmex_Archives_Mirror",
    default_args=default_args,
    description="Mirror the archives from public.bitmex.com ",
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=["raw price history"],
    catchup=False,
) as dag:

    t1 = BashOperator(
        task_id="download_csv_gz_archives",
        bash_command="sleep 1000",
    )

    t2 = BashOperator(
        task_id="process_archives_into_daily_csv_files",
        depends_on_past=False,
        bash_command="sleep 1000",
        retries=3,
    )

    t1 >> t2

I just run it in 2.2.3 and I was able to successuly start even 5 paralllel runs and no problems with Scheduler

Screenshot from 2022-01-02 19-57-18

t4n1o commented 2 years ago
Apache Airflow
version                | 2.2.3                                                            
executor               | LocalExecutor                                                    
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler              
sql_alchemy_conn       | postgresql+psycopg2://airflow:airflow_pass@localhost:5432/airflow
dags_folder            | /opt/data_workflows/dags                     
plugins_folder         | /opt/airflow/plugins                                             
base_log_folder        | /var/log/airflow                                                 
remote_base_log_folder |                                                                  

System info
OS              | Linux                                                                                                                                           
architecture    | x86_64                                                                                                                                          
uname           | uname_result(system='Linux', node='staging', release='5.10.0-8-amd64', version='#1 SMP Debian 5.10.46-4 (2021-08-03)', machine='x86_64')
locale          | ('en_US', 'UTF-8')                                                                                                                              
python_version  | 3.9.2 (default, Feb 28 2021, 17:03:44)  [GCC 10.2.1 20210110]                                                                                   
python_location | /opt/airflow/bin/python                                                                                                                         

Tools info
git             | git version 2.30.2                                                                         
ssh             | OpenSSH_8.4p1 Debian-5, OpenSSL 1.1.1k  25 Mar 2021                                        
kubectl         | NOT AVAILABLE                                                                              
gcloud          | NOT AVAILABLE                                                                              
cloud_sql_proxy | NOT AVAILABLE                                                                              
mysql           | NOT AVAILABLE                                                                              
sqlite3         | 3.34.1 2021-01-20 14:10:07 10e20c0b43500cfb9bbc0eaa061c57514f715d87238f4d835880cd846b9ealt1
psql            | psql (PostgreSQL) 13.4 (Debian 13.4-0+deb11u1)                                             

Paths info
airflow_home    | /opt/airflow                                                                                                                                                                                     
system_path     | /usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/opt/airflow/bin:/usr/lib/postgresql/13/bin                                                                                             
python_path     | /opt/airflow/bin:/usr/lib/python39.zip:/usr/lib/python3.9:/usr/lib/python3.9/lib-dynload:/opt/airflow/lib/python3.9/site-packages:/opt/data_workflows/dags:/opt/airflow/confi
                | g:/opt/airflow/plugins                                                                                                                                                                           
airflow_on_path | True                                                                                                                                                                                             

Providers info
apache-airflow-providers-ftp      | 2.0.1
apache-airflow-providers-http     | 2.0.1
apache-airflow-providers-imap     | 2.0.1
apache-airflow-providers-postgres | 2.3.0
apache-airflow-providers-sqlite   | 2.0.1
t4n1o commented 2 years ago
1. Do you actually have scheduler running at all?  Does it have coninuous access to the DB?

Yes, the scheduler is visible in the running processes, and also systemd. The scheduler is the only program accessing the db. There isn't anything else running on the test server besides airflow. The db is on the same RAID array/filesystem as the data.

  1. Are you absolutely sure you are not using SequentialExecutior ? What does your airflow info say - can you paste-bin output of it ? (airlfow has built-in flag to send to pastebin). Please make sure also that you do it in exactly the way your scheduler works. Miost likely you run your scheduler with a different configuration than your webserver and that causes the problem. I can't find the flag for pastebin but I pasted above. The non-standard thing about the installation of airflow is that it uses a virtualenv and systemd.
  2. Are you sure you are using Postgres and not Sqlite? Yes
  3. Where is your Python code (non-DAG)? Did you .airflowignore non-DAG files from airflow's DAG folder? /opt/data-workflows <= code /opt/data-workflows/rust <= code /opt/data-workflows/dags There are only dags in the dag folder, I kept everything modular and separated.
  4. can you upgrade to Airlfow 2.2.3 (latest released) - it has built-in warnings in case you use Sequential Executor/SQLite in the UI. Yes, still the same problem on 2.2.3. No warnings about sqllite or sequential executor
  5. Can you change your DAGs to: When I run your dag with the sleep 1000, I also can launch 2 dags in parallel.But it still displays the weird msg on the web UI. image

image

Update: the scheduler did wake up eventually.

t4n1o commented 2 years ago

I'm sorry, I didn't intend this to turn into an unpaid debugging session, and it looks like a cosmetic problem more than anything. So I'm comfortable closing this thread if you prefer.

t4n1o commented 2 years ago

Could it be the scheduler settings?:


# [scheduler]
airflow_scheduler_job_heartbeat_sec: 5
airflow_scheduler_clean_tis_without_dagrun_interval: 15.0
airflow_scheduler_heartbeat_sec: 5
airflow_scheduler_num_runs: -1
airflow_scheduler_processor_poll_interval: 1
airflow_scheduler_min_file_process_interval: 30
# https://stackoverflow.com/questions/43606311/refreshing-dags-without-web-server-restart-apache-airflow
airflow_scheduler_dag_dir_list_interval: 40 # How often to check dag folder for new dags
airflow_scheduler_print_stats_interval: 30
airflow_scheduler_pool_metrics_interval: 5.0
airflow_scheduler_scheduler_health_check_threshold: 30
airflow_scheduler_orphaned_tasks_check_interval: 300.0
airflow_child_process_log_folder: "{{ airflow_logs_folder }}/scheduler"
airflow_scheduler_zombie_task_threshold: 300
airflow_scheduler_catchup_by_default: True
airflow_scheduler_max_tis_per_query: 512
airflow_scheduler_use_row_level_locking: True
airflow_scheduler_max_dagruns_to_create_per_loop: 10
airflow_scheduler_max_dagruns_per_loop_to_schedule: 20
airflow_scheduler_schedule_after_task_execution: True
airflow_scheduler_parsing_processes: 2
airflow_file_parsing_sort_mode: modified_time
airflow_scheduler_use_job_schedule: True
airflow_scheduler_allow_trigger_in_future: False
airflow_dependency_detector: "airflow.serialization.serialized_objects.DependencyDetector"

airflow_scheduler_run_duration: -1
airflow_scheduler_min_file_parsing_loop_time: 1
airflow_scheduler_max_threads: 2
airflow_scheduler_authenticate: True
potiuk commented 2 years ago

I still think when you run Airflow schduler via systemd you run it as "sequential" executor. The problem is that when you run airflow as systemd, you very likely use default environment variables - not the ones you have in your .bashrc or wherever you keep them.

In this case airlfow scheduler falls back to default settings. So when you run airlfow info it uses the variables you have defined in your interactive bash session (and it shows LocalExecutor) but when you run it as systemd it uses Sequential one and Sqlite.

Please check in the logs of your scheduler - it should print the executor used when it starts.

t4n1o commented 2 years ago
Jan 03 10:38:10 testing-server systemd[1]: Started Airflow scheduler daemon.
Jan 03 10:38:12 testing-server airflow[954951]: /opt/airflow/lib/python3.9/site-packages/airflow/configuration.py:361 DeprecationWarning: The dag_concurrency option in [core] has been renamed to m>
Jan 03 10:38:12 testing-server airflow[954951]: /opt/airflow/lib/python3.9/site-packages/airflow/configuration.py:361 DeprecationWarning: The processor_poll_interval option in [scheduler] has been>
Jan 03 10:38:12 testing-server airflow[954951]:   ____________       _____________
Jan 03 10:38:12 testing-server airflow[954951]:  ____    |__( )_________  __/__  /________      __
Jan 03 10:38:12 testing-server airflow[954951]: ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
Jan 03 10:38:12 testing-server airflow[954951]: ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
Jan 03 10:38:12 testing-server airflow[954951]:  _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Jan 03 10:38:12 testing-server airflow[954960]: /opt/airflow/lib/python3.9/site-packages/airflow/configuration.py:361 DeprecationWarning: The worker_log_server_port option in [celery] has been mov>
Jan 03 10:38:12 testing-server airflow[954960]: [2022-01-03 10:38:12 +0100] [954960] [INFO] Starting gunicorn 20.1.0
Jan 03 10:38:12 testing-server airflow[954960]: [2022-01-03 10:38:12 +0100] [954960] [INFO] Listening at: http://0.0.0.0:8793 (954960)
Jan 03 10:38:12 testing-server airflow[954960]: [2022-01-03 10:38:12 +0100] [954960] [INFO] Using worker: sync
Jan 03 10:38:12 testing-server airflow[954961]: [2022-01-03 10:38:12 +0100] [954961] [INFO] Booting worker with pid: 954961
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,496] {scheduler_job.py:596} INFO - Starting the scheduler
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,496] {scheduler_job.py:601} INFO - Processing each file at most -1 times
Jan 03 10:38:12 testing-server airflow[954982]: [2022-01-03 10:38:12 +0100] [954982] [INFO] Booting worker with pid: 954982
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,531] {manager.py:163} INFO - Launched DagFileProcessorManager with pid: 955008
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,532] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:38:12 testing-server airflow[955008]: [2022-01-03 10:38:12,537] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
Jan 03 10:43:12 testing-server airflow[954951]: [2022-01-03 10:43:12,626] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:48:12 testing-server airflow[954951]: [2022-01-03 10:48:12,690] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
...skipping...
Jan 03 10:21:40 testing-server airflow[953130]:  ____    |__( )_________  __/__  /________      __
Jan 03 10:21:40 testing-server airflow[953130]: ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
Jan 03 10:21:40 testing-server airflow[953130]: ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
Jan 03 10:21:40 testing-server airflow[953130]:  _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Jan 03 10:21:40 testing-server airflow[953139]: /opt/airflow/lib/python3.9/site-packages/airflow/configuration.py:361 DeprecationWarning: The worker_log_server_port option in [celery] has been mov>
Jan 03 10:21:40 testing-server airflow[953139]: [2022-01-03 10:21:40 +0100] [953139] [INFO] Starting gunicorn 20.1.0
Jan 03 10:21:40 testing-server airflow[953139]: [2022-01-03 10:21:40 +0100] [953139] [INFO] Listening at: http://0.0.0.0:8793 (953139)
Jan 03 10:21:40 testing-server airflow[953139]: [2022-01-03 10:21:40 +0100] [953139] [INFO] Using worker: sync
Jan 03 10:21:40 testing-server airflow[953140]: [2022-01-03 10:21:40 +0100] [953140] [INFO] Booting worker with pid: 953140
Jan 03 10:21:40 testing-server airflow[953130]: [2022-01-03 10:21:40,564] {scheduler_job.py:596} INFO - Starting the scheduler
Jan 03 10:21:40 testing-server airflow[953130]: [2022-01-03 10:21:40,565] {scheduler_job.py:601} INFO - Processing each file at most -1 times
Jan 03 10:21:40 testing-server airflow[953164]: [2022-01-03 10:21:40 +0100] [953164] [INFO] Booting worker with pid: 953164
Jan 03 10:21:40 testing-server airflow[953130]: [2022-01-03 10:21:40,601] {manager.py:163} INFO - Launched DagFileProcessorManager with pid: 953189
Jan 03 10:21:40 testing-server airflow[953130]: [2022-01-03 10:21:40,602] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:21:40 testing-server airflow[953189]: [2022-01-03 10:21:40,607] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
Jan 03 10:26:40 testing-server airflow[953130]: [2022-01-03 10:26:40,726] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:31:40 testing-server airflow[953130]: [2022-01-03 10:31:40,845] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:36:40 testing-server airflow[953130]: [2022-01-03 10:36:40,943] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,733] {scheduler_job.py:733} INFO - Exiting scheduler loop as requested number of runs (1000 - got to 1000) has been reached
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,733] {manager.py:361} INFO - Sending termination message to manager.
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,734] {scheduler_job.py:637} INFO - Deactivating DAGs that haven't been touched since 2022-01-03T09:21:40.601942+00:00
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,738] {local_executor.py:388} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don'>
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,757] {process_utils.py:120} INFO - Sending Signals.SIGTERM to group 953189. PIDs of all processes in the group: []
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,757] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 953189
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,757] {process_utils.py:89} INFO - Sending the signal Signals.SIGTERM to process 953189 as process group is missing.
Jan 03 10:38:04 testing-server airflow[953130]: [2022-01-03 10:38:04,757] {scheduler_job.py:655} INFO - Exited execute loop
Jan 03 10:38:04 testing-server airflow[953139]: [2022-01-03 10:38:04 +0100] [953139] [INFO] Handling signal: term
Jan 03 10:38:04 testing-server airflow[953164]: [2022-01-03 10:38:04 +0100] [953164] [INFO] Worker exiting (pid: 953164)
Jan 03 10:38:04 testing-server airflow[953140]: [2022-01-03 10:38:04 +0100] [953140] [INFO] Worker exiting (pid: 953140)
Jan 03 10:38:04 testing-server airflow[953139]: [2022-01-03 10:38:04 +0100] [953139] [INFO] Shutting down: Master
Jan 03 10:38:05 testing-server systemd[1]: airflow-scheduler.service: Succeeded.
Jan 03 10:38:10 testing-server systemd[1]: airflow-scheduler.service: Scheduled restart job, restart counter is at 46.
Jan 03 10:38:10 testing-server systemd[1]: Stopped Airflow scheduler daemon.
Jan 03 10:38:10 testing-server systemd[1]: Started Airflow scheduler daemon.
Jan 03 10:38:12 testing-server airflow[954951]: /opt/airflow/lib/python3.9/site-packages/airflow/configuration.py:361 DeprecationWarning: The dag_concurrency option in [core] has been renamed to m>
Jan 03 10:38:12 testing-server airflow[954951]: /opt/airflow/lib/python3.9/site-packages/airflow/configuration.py:361 DeprecationWarning: The processor_poll_interval option in [scheduler] has been>
Jan 03 10:38:12 testing-server airflow[954951]:   ____________       _____________
Jan 03 10:38:12 testing-server airflow[954951]:  ____    |__( )_________  __/__  /________      __
Jan 03 10:38:12 testing-server airflow[954951]: ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
Jan 03 10:38:12 testing-server airflow[954951]: ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
Jan 03 10:38:12 testing-server airflow[954951]:  _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Jan 03 10:38:12 testing-server airflow[954960]: /opt/airflow/lib/python3.9/site-packages/airflow/configuration.py:361 DeprecationWarning: The worker_log_server_port option in [celery] has been mov>
Jan 03 10:38:12 testing-server airflow[954960]: [2022-01-03 10:38:12 +0100] [954960] [INFO] Starting gunicorn 20.1.0
Jan 03 10:38:12 testing-server airflow[954960]: [2022-01-03 10:38:12 +0100] [954960] [INFO] Listening at: http://0.0.0.0:8793 (954960)
Jan 03 10:38:12 testing-server airflow[954960]: [2022-01-03 10:38:12 +0100] [954960] [INFO] Using worker: sync
Jan 03 10:38:12 testing-server airflow[954961]: [2022-01-03 10:38:12 +0100] [954961] [INFO] Booting worker with pid: 954961
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,496] {scheduler_job.py:596} INFO - Starting the scheduler
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,496] {scheduler_job.py:601} INFO - Processing each file at most -1 times
Jan 03 10:38:12 testing-server airflow[954982]: [2022-01-03 10:38:12 +0100] [954982] [INFO] Booting worker with pid: 954982
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,531] {manager.py:163} INFO - Launched DagFileProcessorManager with pid: 955008
Jan 03 10:38:12 testing-server airflow[954951]: [2022-01-03 10:38:12,532] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:38:12 testing-server airflow[955008]: [2022-01-03 10:38:12,537] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
Jan 03 10:43:12 testing-server airflow[954951]: [2022-01-03 10:43:12,626] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
Jan 03 10:48:12 testing-server airflow[954951]: [2022-01-03 10:48:12,690] {scheduler_job.py:1114} INFO - Resetting orphaned tasks for active dag runs
t4n1o commented 2 years ago

It says LocalExecutor in the logs^^. Plus the env vars are in the system-wide /etc/environment, and also in the systemd service file.

I think it may be the scheduler-specific settings in the airflow.cfg.

potiuk commented 2 years ago

It says LocalExecutor in the logs^^. Plus the env vars are in the system-wide /etc/environment, and also in the systemd service file.

I think it may be the scheduler-specific settings in the airflow.cfg.

I propose you regenerate the default config and only apply changes you really want to make. You seem to have some very old, deprecated options there.

LIKEL123 commented 2 years ago

I have the same problem, and I don't know how to solve it. but my airflow-scheduler.err the errors reported in this log are: "sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'dag_run' expected to update 5 row(s); 0 were matched. ", How should this problem be solved

LIKEL123 commented 2 years ago

顺便提一句。是的,检查在您的数据库中打开的连接的限制可能是一个好主意。特别是如果您在顶层的 DAG 中使用变量,它可能会导致打开大量连接,这*最终_可能_导致调度程序尝试打开新连接并耐心等待数据库服务器有任何空闲连接。可能只是因为您长时间运行的任务的编写方式是它们(意外地)打开了许多这些连接并且在任务完成之前不关闭它们。

我认为 PGBouncer可能会对此有所帮助,但如果一个长时间运行的任务打开了太多连接并且它们没有关闭,那也可能没有太大帮助。My problem should be the database I have the same problem, and I don't know how to solve it. but my airflow-scheduler.err the errors reported in this log are: "sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'dag_run' expected to update 5 row(s); 0 were matched. ", How should this problem be solved

potiuk commented 2 years ago

It's likely different problem. Make sure you do not use SQLite and Sequential executor. Recreate installation/database if needed. There is not enough information here to help you @LIKEL123. If you still will have the problem open a new discussion and describe in detail what your configuration/deployment is and all the logs you can get - maybe someone will be able to help you

bondgeek commented 2 years ago

Since the assumption seems to be that this is an isolated issue, I just want to report that we are seeing it, too. After successfully migrating to v2 several months ago, started seeing this message 3 days ago. It's getting worse everyday (started reporting "heartbeat not detected in 2 minutes" before resolving, now it's up to 5 minutes).

khintz commented 2 years ago

As @bondgeek we have also started seeing it within the last few days, after having run the same configuration with the same dags for more than a week. Our setup is running on a Kubernetes cluster, and I tried to redeploy everything (airflow-webserver, airflow-scheduler, and the PostgreSQL db) today. We have not seen the problem since, but I expect it to return after some time.

bh-ashishsharma commented 2 years ago

@khintz , @bondgeek Did you guys find any solution?

We are also running airflow on Kubernetes cluster and facing this issue time and time again. For us, it happens when we run a long running process with KubernetesPodOperator. But it's not reproducible every time but comes after few days of running it.

image

Not sure if it's useful but this is the last error we received. image

bondgeek commented 2 years ago

@bh-ashishsharma We ended up scaling up the task instance for the airflow scheduler (we are running a dockerized version on AWS ECS), and have not seen the error since. It's still not clear what the scheduler is doing that is so resource intensive.

t4n1o commented 2 years ago

No, we've never found a solution.

One hypothesis was that the contention for resources comes from the long running task writing data to the same hard disk where the postgres database is.

paul-bormans commented 2 years ago

We are seeing the same issue running v2.2.4 on K8S

We first increased the setting "scheduler_health_check_threshold" from default 30 to 180 (i.e. seconds), that prevented at least that Airflow kills tasks because of the timeout (somehow related?).

image

I suspected too much logging being written maybe but there is only a few GB of logging and with log-groomer running (enabled by default in the chart) retention should be fine.

Also i couldn't find any DB issue's, it's rather small even; few MB's only.

For now the issue is resolved by a simple redeploy, btw reusing the DB.

I would like to know how to debug this issue further, can airflow be profiled? What debug logging to enable to get some better insight?

Paul

potiuk commented 2 years ago

It might be related to the PR #23944 - are you usuing Python 3.10? If so - switching to Python 3.9 might help and we might fix it one of the future versions.

khintz commented 2 years ago

@bh-ashishsharma :We "fixed it" by allocating more memory to our running containers on Kubernetes. Since then we have not seen the issue.

potiuk commented 2 years ago

@bh-ashishsharma :We "fixed it" by allocating more memory to our running containers on Kubernetes. Since then we have not seen the issue.

Well. Increasing resource usage when you do not have enough of them is really good fix. Unfortunately only your deploymenent can monitor and indicate if you lack memory or not, so this was indeed a fix to a deployment issue.

Good to know - we would likely have a good case to explain to others to pay attention to.

@khintz - maybe you would like to make a PR to our docs describing what you observed and remediation that you applied ? I think the best approach is to modify this doc: https://airflow.apache.org/docs/apache-airflow/stable/production-deployment.html#scheduler-uptime

It's super easy - just use "Suggest change on this page" link at top-bottom and it will open a GitHub UI where you will be able to make changes to the docs directly using GitHub UI - without any need for any environment.

That would definitely help others who might have similar issue.

t4n1o commented 2 years ago

Well. Increasing resource usage when you do not have enough of them is really good fix. Unfortunately only your deploymenent can monitor and indicate if you lack memory or not, so this was indeed a fix to a deployment issue.

A lot of the answers in this thread are upgrade your python, check your deployment, update your airflow, only you know your deployment and so on. I'm sure my server has plenty unused resources. The only question would be if the config can make airflow self-limit the amount of resources it uses, and the other possibility is the logic of the scheduler loop has some function call that's blocking.

I understand the responses are because development time is valuable. I suspect it's a multithreading/async problem. So, would you consider linking me to the part of the code where the schedule loop is and maybe I can help figure out what's blocking?

dvfeinblum-betterment commented 2 years ago

Just hopping in here, we've also been running into this issue after upgrading from 1.10.15 -> 2.2.3. As others in the thread have described, the problem is getting progressively worse and there is no obvious issues in the deployment (CPU Utilization for our scheduler is low, etc).

It's interesting hearing that, in spite of this, increasing resources seemed to help others. We'll give that a shot and see if it helps.

potiuk commented 2 years ago

@t4n1o - (but also others in this thread) a lot of the issues are because we have diffuculties with seeing clear reproduction of the problem - and we have to rely on users like you to spend their time on trying to analyze the settings, configuration, deployment they have, perform enough analysis and get enought clues that those who know the code best can make intelligent guesses what is wrong even if there is no "full and clear reproduction steps". It's not that development time is valuable. Airflow is developed by > 2100 contributors - often people like you. The source code is available, anyone can take a look and while some people know more about some parts of code, you get the software for free, and you cannot "expect" those people to spend a lot of time on trying to figure out what's wrong if they have no clear reproduction steps and enough clues.

Making analysis and enough evidences to see what you observe is the best you can do to pay back for the free software - and possibly give those looking here enough clues to fix or direct you how to solve the problem.

So absolutely - if you feel like looking at the code and analysing it is something you can offer the community as your "pay back" - this is fantastic.

The scheduler Job is here: https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py.

But we can give you more than that: https://www.youtube.com/watch?v=DYC4-xElccE - this is video from Airlfow Summit 2021 where Ash explains how scheduler works - i.e. what were the design assumptions. And it can guide you in understanding what Scheduler Job does.

Also, before you dive deep, it might well be that your set of DAGs and way you structure them is a problem and you can simply follow our guidelines on Fine tuning your scheduler performance

So if you want to help - absolutely, make more analysis, look at the guidelines of ours, if you feel like it, dive deep into how scheduler works and look at the code. All that might be great way to get more clues, and evidences, and even if you won't be able to fix it in a PR you can give others enough clues that that they can find root cause and implement solutions.

deepchand commented 2 years ago

I have also facing the same problem, below are the details
 airflow - 2.2.2
 providers apache-airflow[celery,redis,microsoft.azure,slack] - 2.2.2
 constraints - https://raw.githubusercontent.com/apache/airflow/constraints-2.2.2/constraints-3.6.txt
 postgres version - 11
 executor - celery executor
 we have total 1 - webserver, 2 - scheduler and 6 - workers in cluster
 When the problem occurs - When we ran around more then 100 parallel dag run it starts to show this warning
 The scheduler does not appear to be running. Last heartbeat was received X minutes ago and the number of minutes get increasing till 20-22 minutes if we have scheduled more dag run in parallel while we have enough resources available on all components.
 These are some config parameters which we are using
 job_heartbeat_sec = 10
 scheduler_heartbeat_sec = 5 
scheduler_idle_sleep_time = 1
 min_file_process_interval = 300
 dag_dir_list_interval = 300
 scheduler_health_check_threshold = 300
 scheduler_zombie_task_threshold = 600 
max_tis_per_query = 512
 use_row_level_locking = True
 max_dagruns_to_create_per_loop = 50 
parsing_processes = 8

While we search in schedulers log we have found scheduling loop time gets increase

Screenshot 2022-08-17 at 4 36 01 PM Screenshot 2022-08-17 at 4 36 18 PM
potiuk commented 2 years ago

This is an interesting finding @ashb @uranusjr @ephraimbuddy. This is not a blocker for 2.3.4 but this is interesting to see that really high variance in the scheduling loop time. Maybe we could come up with some hypotheses why this is happening.

potiuk commented 2 years ago

I think the max_tis_per_query is quite high, but even with it, it is suspicious to see that 512 tis are processed in 594 seconds. Some of the queries must simply run for a very long time. Is it possible to get some stats from Postgres on what are the longest running queries @deepchand ? There are a number of guides in the internet - for example this one that show how to do it: https://www.shanelynn.ie/postgresql-find-slow-long-running-and-blocked-queries/

uranusjr commented 2 years ago

I wonder if this is related to the locking issue we just resolved recently.

potiuk commented 2 years ago

Do you remember which one @uranusjr ?

potiuk commented 2 years ago

This one ? https://github.com/apache/airflow/pull/25532

deepchand commented 2 years ago

I have tried to debug it more and found its self.executor.heartbeat() function which is taking almost 95% of total time of scheduling loop

uranusjr commented 2 years ago

Yes that one

deepchand commented 2 years ago

@potiuk i have debugged more on this issue and found like updating the task state in the scheduler loop significantly increase the total time of self.executor.heartbeat() function which is causing this problem "The scheduler does not appear to be running. Last heartbeat was received X minutes ago" To be specific https://github.com/apache/airflow/blob/9ac742885ffb83c15f7e3dc910b0cf9df073407a/airflow/executors/celery_executor.py#L312 the state which we are passing in this function is kind of blocking and if we stop updating task state from here significantly decrease the scheduling loop time and also not giving this warning "The scheduler does not appear to be running. Last heartbeat was received X minutes ago" Please have a look and let me know if any other info is needed

potiuk commented 2 years ago

I do not know that well, but for me it looks like you have some bottlences with your Celery Queu. You have not specified what kind of queue you used - but I think you should look at your Redis or RabbitMQ and see if there are no problems there. Also it might be that simply your Redis or RabbitMQ is badly configured or overloaded and it is somehow blocking state update.

Can you please set log level to debug and see if there are more information printed on what's going on in update_state method?

(you will find how to do it in airlfow configuration docs)

deepchand commented 2 years ago

@potiuk We are using Redis as queue, i have cross verified redis is not bottleneck and messages are consumed from redis as soon as they land in redis queue. We have enough resources available on redis side as well.

Screenshot 2022-08-26 at 8 23 59 PM Screenshot 2022-08-26 at 8 25 06 PM
potiuk commented 2 years ago

Any chance for debug logs ?

deepchand commented 2 years ago

Debug logs of scheduler ?

potiuk commented 2 years ago

Yep:

Can you please set log level to debug and see if there are more information printed on what's going on in update_state method?

(you will find how to do it in airlfow configuration docs)

deepchand commented 2 years ago

@potiuk I have added some more debug logs to find the total time taken while fetching state of a single result and for total results in a single loop, please find below

Screenshot 2022-08-26 at 10 04 51 PM Screenshot 2022-08-26 at 10 05 52 PM