apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.37k stars 14.1k forks source link

scheduler gets stuck without a trace #7935

Closed dimberman closed 3 years ago

dimberman commented 4 years ago

Apache Airflow version:

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

The scheduler gets stuck without a trace or error. When this happens, the CPU usage of scheduler service is at 100%. No jobs get submitted and everything comes to a halt. Looks it goes into some kind of infinite loop. The only way I could make it run again is by manually restarting the scheduler service. But again, after running some tasks it gets stuck. I've tried with both Celery and Local executors but same issue occurs. I am using the -n 3 parameter while starting scheduler.

Scheduler configs, job_heartbeat_sec = 5 scheduler_heartbeat_sec = 5 executor = LocalExecutor parallelism = 32

Please help. I would be happy to provide any other information needed

What you expected to happen:

How to reproduce it:

Anything else we need to know:

Moved here from https://issues.apache.org/jira/browse/AIRFLOW-401

ashb commented 3 years ago

Airflow doesn't use threads - so I'm not sure why there are two threads in the about trace.

Oh multiprocessing uses threads internally

dhuang commented 3 years ago

Started seeing this for the first time ever after we upgraded from 1.10.5 to 1.10.14.

MatthewRBruce commented 3 years ago

We just saw this on 2.0.1 when we added a largish number of new DAGs (We're adding around 6000 DAGs total, but this seems to lock up when about 200 try to be scheduled at once).

Here's py-spy stacktraces from our scheduler:

Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:411)
    send (multiprocessing/connection.py:206)
    send_callback_to_execute (airflow/utils/dag_processing.py:283)
    _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
    _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)
    _do_scheduling (airflow/jobs/scheduler_job.py:1521)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1382)
    _execute (airflow/jobs/scheduler_job.py:1280)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:405)
    send (multiprocessing/connection.py:206)
    _run_parsing_loop (airflow/utils/dag_processing.py:698)
    start (airflow/utils/dag_processing.py:596)
    _run_processor_manager (airflow/utils/dag_processing.py:365)
    run (multiprocessing/process.py:108)
    _bootstrap (multiprocessing/process.py:315)
    _launch (multiprocessing/popen_fork.py:75)
    __init__ (multiprocessing/popen_fork.py:19)
    _Popen (multiprocessing/context.py:277)
    start (multiprocessing/process.py:121)
    start (airflow/utils/dag_processing.py:248)
    _execute (airflow/jobs/scheduler_job.py:1276)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

What I think is happening is that the pipe between the DagFileProcessorAgent and the DagFileProcessorManager is full and is causing the Scheduler to deadlock.

From what I can see the DagFileProcessorAgent only pulls data off the pipe in it's heartbeat and wait_until_finished functions (https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/utils/dag_processing.py#L374)

and that the SchedulerJob is responsible for calling it's heartbeat function each scheduler loop (https://github.com/apache/airflow/blob/beb8af5ac6c438c29e2c186145115fb1334a3735/airflow/jobs/scheduler_job.py#L1388).

However, the SchedulerJob is blocked from calling heartbeat because it's blocked forever trying to send data to the same full pipe as part of the _send_dag_callbacks_to_processor in the _do_scheduling_ function causing a deadlock.

ashb commented 3 years ago

Nice debugging @MatthewRBruce - and your diagnosis seems sound. We'll start on a fix next week.

milton0825 commented 3 years ago

Have a theory of why the Airflow scheduler may stuck at CeleryExecutor._send_tasks_to_celery (our scheduler stuck in a different place 😃).

The size of the return value from send_task_to_executor may be huge as the traceback is included in case of failure and looks like it is a known bug [1] in cpython that huge output can cause deadlock in multiprocessing.Pool.

For example, the following code easily deadlock on Python 3.6.3:

import multiprocessing
import time

def f(x):
    return ' ' * 1000000
if __name__ == '__main__':
    with multiprocessing.Pool(1) as p:
        r = p.map(f, ('hi'*100000))

[1] https://bugs.python.org/issue35267

ashb commented 3 years ago

@milton0825 Sounds plausible for what I know of your usecase :grin: You're still on 1.10.x right? The scheduler on 2.0 sends a lot less data over the MP pipes, (it doesn't send the DAG, that gets written to the DB) so that particular issue won't be for 2.0+

milton0825 commented 3 years ago

Right we are still on 1.10.8

SaithZhang commented 3 years ago

Seeing this on 1.10.14 + CeleryExecutor + python 3.8, will this be fix on 1.10.x? for some reason our company has to use mysql 5.6.

 ps -ef |grep airflow
root       9522      1  1 15:24 ?        00:00:13 /data/anaconda3/envs/airflow/bin/python /data/anaconda3/envs/airflow/bin/airflow webserver -D
root       9528      1  0 15:24 ?        00:00:00 gunicorn: master [airflow-webserver]
root      21238      1  0 15:31 ?        00:00:04 /data/anaconda3/envs/airflow/bin/python /data/anaconda3/envs/airflow/bin/airflow scheduler -D
root      21239  21238  1 15:31 ?        00:00:09 airflow scheduler -- DagFileProcessorManager
root      38695   9528  1 15:42 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      39492   9528  2 15:43 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      39644   9528  4 15:43 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      40455   9528 51 15:44 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      40503  21239  0 15:44 ?        00:00:00 [airflow schedul] <defunct>
root      40504  21239  0 15:44 ?        00:00:00 [airflow schedul] <defunct>

the [airflow schedul] defunct process is keep restarting all the time.

ashb commented 3 years ago

@DreamyWen unlikely I'm afraid, at least not by me. I'll happily review a PR if anyone has time to submit it, but can't put any time to fixing this on 1.10 release branch, sorry

leonsmith commented 3 years ago

+1 on this issue.

Airflow 2.0.1

CeleryExecutor.

7000 dags~ seems to happen under load (when we have a bunch all dags all kick off at midnight)

py-spy dump --pid 132 --locals ``` Process 132: /usr/local/bin/python /usr/local/bin/airflow scheduler Python v3.8.3 (/usr/local/bin/python) Thread 132 (idle): "MainThread" _send (multiprocessing/connection.py:368) Arguments:: self: buf: write: Locals:: remaining: 1213 _send_bytes (multiprocessing/connection.py:411) Arguments:: self: buf: Locals:: n: 1209 header: send (multiprocessing/connection.py:206) Arguments:: self: obj: send_callback_to_execute (airflow/utils/dag_processing.py:283) Arguments:: self: request: _process_executor_events (airflow/jobs/scheduler_job.py:1242) Arguments:: self: session: Locals:: ti_primary_key_to_try_number_map: {("redeacted", "redeacted", ): 1, ...} event_buffer: {...} tis_with_right_state: [("redeacted", "redeacted", , 1), ...] ti_key: ("redeacted", "redeacted", ...) value: ("failed", None) state: "failed" _: None filter_for_tis: tis: [, , , ...] ti: try_number: 1 buffer_key: ("redeacted", ...) info: None msg: "Executor reports task instance %s finished (%s) although the task says its %s. (Info: %s) Was the task killed externally?" request: wrapper (airflow/utils/session.py:62) Locals:: args: () kwargs: {"session": } _run_scheduler_loop (airflow/jobs/scheduler_job.py:1386) Arguments:: self: Locals:: is_unit_test: False call_regular_interval: loop_count: 1 timer: session: num_queued_tis: 17 _execute (airflow/jobs/scheduler_job.py:1280) Arguments:: self: Locals:: pickle_dags: False async_mode: True processor_timeout_seconds: 600 processor_timeout: execute_start_time: run (airflow/jobs/base_job.py:237) Arguments:: self: Locals:: session: scheduler (airflow/cli/commands/scheduler_command.py:63) Arguments:: args: Locals:: job: wrapper (airflow/utils/cli.py:89) Locals:: args: () kwargs: {} metrics: {"sub_command": "scheduler", "start_datetime": , ...} command (airflow/cli/cli_parser.py:48) Locals:: args: () kwargs: {} func: main (airflow/__main__.py:40) Locals:: parser: args: (airflow:8) ```
py-spy dump --pid 134 --locals ``` Process 134: airflow scheduler -- DagFileProcessorManager Python v3.8.3 (/usr/local/bin/python) Thread 134 (idle): "MainThread" _send (multiprocessing/connection.py:368) Arguments:: self: buf: write: Locals:: remaining: 2276 _send_bytes (multiprocessing/connection.py:411) Arguments:: self: buf: Locals:: n: 2272 header: send (multiprocessing/connection.py:206) Arguments:: self: obj: (...) _run_parsing_loop (airflow/utils/dag_processing.py:698) Locals:: poll_time: 0.9996239839999816 loop_start_time: 690.422146969 ready: [] agent_signal: sentinel: processor: all_files_processed: False max_runs_reached: False dag_parsing_stat: (...) loop_duration: 0.0003760160000183532 start (airflow/utils/dag_processing.py:596) Arguments:: self: _run_processor_manager (airflow/utils/dag_processing.py:365) Arguments:: dag_directory: "/code/src/dags" max_runs: -1 processor_factory: processor_timeout: signal_conn: dag_ids: [] pickle_dags: False async_mode: True Locals:: processor_manager: run (multiprocessing/process.py:108) Arguments:: self: _bootstrap (multiprocessing/process.py:315) Arguments:: self: parent_sentinel: 8 Locals:: util: context: _launch (multiprocessing/popen_fork.py:75) Arguments:: self: process_obj: Locals:: code: 1 parent_r: 6 child_w: 7 child_r: 8 parent_w: 9 __init__ (multiprocessing/popen_fork.py:19) Arguments:: self: process_obj: _Popen (multiprocessing/context.py:276) Arguments:: process_obj: Locals:: Popen: start (multiprocessing/process.py:121) Arguments:: self: start (airflow/utils/dag_processing.py:248) Arguments:: self: Locals:: mp_start_method: "fork" context: child_signal_conn: process: _execute (airflow/jobs/scheduler_job.py:1276) Arguments:: self: Locals:: pickle_dags: False async_mode: True processor_timeout_seconds: 600 processor_timeout: run (airflow/jobs/base_job.py:237) Arguments:: self: Locals:: session: scheduler (airflow/cli/commands/scheduler_command.py:63) Arguments:: args: Locals:: job: wrapper (airflow/utils/cli.py:89) Locals:: args: () kwargs: {} metrics: {"sub_command": "scheduler", "start_datetime": , ...} command (airflow/cli/cli_parser.py:48) Locals:: args: () kwargs: {} func: main (airflow/__main__.py:40) Locals:: parser: args: (airflow:8) ```
oleksandr-yatsuk commented 3 years ago

We had the same issue with Airflow on Google Cloud until increased the setting AIRFLOWCORESQL_ALCHEMY_MAX_OVERFLOW The default value was 5, with a change to 60 our Airflow server started to perform very well, including on complex DAGs with around 1000 tasks each. Any scale-up was resting on the database concurrent connections limit, so the scheduler was not able to perform fast.

dimberman commented 3 years ago

@ashb considering what @oleksandr-yatsuk found, maybe this is a database issue?

leonsmith commented 3 years ago

No freezes since bumping AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW like @oleksandr-yatsuk suggested

ashb commented 3 years ago

I've got a fix for the case reported by @MatthewRBruce (for 2.0.1) coming in 2.0.2

yuqian90 commented 3 years ago

Hi @ashb I would like to report that we've been seeing something similar to this issue in Airflow 2.0.2 recently.

We are using airflow 2.0.2 with a single airflow-scheduler + a few airflow-worker using CeleryExecutor and postgres backend running dozens of dags each with hundreds to a few thousand tasks. Python version is 3.8.7.

Here's what we saw: airflow-scheduler sometimes stops heartbeating and stops scheduling any tasks. This seems to happen at random times, about once or twice a week. When this happens, the last line in the scheduler log shows the following, i.e. it stopped writing out any log after receiving signal 15. I did strace the airflow scheduler process. It did not capture any other process sending it signal 15. So most likely the signal 15 was sent by the scheduler to itself.

May 11 21:19:56 airflow[12643]: [2021-05-11 21:19:56,908] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', ...]
May 11 21:19:56 airflow[12643]: [2021-05-11 21:19:56,973] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15

When the scheduler was in this state, there was also a child airflow scheduler process shown in ps which was spawned by the main airflow scheduler process. I forgot py-spy dump, but I did use py-spy top to look at the child airflow scheduler process. This was what I saw. It seems to be stuck somewhere in celery_executor.py::_send_tasks_to_celery. This sounds similar to what @milton0825 reported previously although he mentioned he was using Airflow 1.10.8.

When I manually SIGTERM the child airflow scheduler process, it died. And immediately the main airflow scheduler started to heartbeat and schedule tasks again like nothing ever happened. So I suspect somewhere when the airflow scheduler was spawning a child processes, it got stuck. But I still don't understand how it produced a Exiting gracefully upon receiving signal 15 in the log.

Total Samples 7859
GIL: 0.00%, Active: 0.00%, Threads: 1

  %Own   %Total  OwnTime  TotalTime  Function (filename:line)
  0.00%   0.00%   0.540s    0.540s   __enter__ (multiprocessing/synchronize.py:95)
  0.00%   0.00%   0.000s    0.540s   worker (multiprocessing/pool.py:114)
  0.00%   0.00%   0.000s    0.540s   _bootstrap (multiprocessing/process.py:315)
  0.00%   0.00%   0.000s    0.540s   _repopulate_pool (multiprocessing/pool.py:303)
  0.00%   0.00%   0.000s    0.540s   main (airflow/__main__.py:40)
  0.00%   0.00%   0.000s    0.540s   start (multiprocessing/process.py:121)
  0.00%   0.00%   0.000s    0.540s   _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
  0.00%   0.00%   0.000s    0.540s   Pool (multiprocessing/context.py:119)
  0.00%   0.00%   0.000s    0.540s   run (airflow/jobs/base_job.py:237)
  0.00%   0.00%   0.000s    0.540s   _repopulate_pool_static (multiprocessing/pool.py:326)
  0.00%   0.00%   0.000s    0.540s   heartbeat (airflow/executors/base_executor.py:158)
  0.00%   0.00%   0.000s    0.540s   _launch (multiprocessing/popen_fork.py:75)
  0.00%   0.00%   0.000s    0.540s   wrapper (airflow/utils/cli.py:89)
  0.00%   0.00%   0.000s    0.540s   __init__ (multiprocessing/pool.py:212)
  0.00%   0.00%   0.000s    0.540s   _Popen (multiprocessing/context.py:277)

One other observation was that when the airflow scheduler was in the stuck state, the DagFileProcessor processes started by airflow scheduler were still running. I could see them writing out logs to dag_processor_manager.log.

davidcaron commented 3 years ago

@yuqian90 I have almost the exact same environment as you, and I have the same problem.

The problem happens roughly twice per day.

I get the same last log message you do: Exiting gracefully upon receiving signal 15 and the exact same py-spy output.

As a last resort, I plan to watch for a hanged subprocess of the scheduler and kill it in a cron job... just like you, when I kill the subprocess manually, the main scheduler process continues as if nothing happened.

yuqian90 commented 3 years ago

The same behaviour in my previous comment happened again so I took a py-spy dump of both the main airflow scheduler and the child process. When the scheduler was stuck, the main airflow scheduler is stuck in celery_executor.py::_send_tasks_to_celery in __exit__ of multiprocessing.Pool. The code suggests _terminate_pool() method does send a SIGTERM. That seems to explain why there's a Exiting gracefully upon receiving signal 15 in the scheduler log, although it's not clear why the SIGTERM is sent to the main scheduler process itself.

The child airflow scheduler is stuck in _send_tasks_to_celery when trying to get the lock of SimpleQueue.

This is the py-spy dump of the main airflow scheduler process when it got stuck:

Python v3.8.7

Thread 0x7FB54794E740 (active): "MainThread"
    poll (multiprocessing/popen_fork.py:27)
    wait (multiprocessing/popen_fork.py:47)
    join (multiprocessing/process.py:149)
    _terminate_pool (multiprocessing/pool.py:729)
    __call__ (multiprocessing/util.py:224)
    terminate (multiprocessing/pool.py:654)
    __exit__ (multiprocessing/pool.py:736)
    _send_tasks_to_celery (airflow/executors/celery_executor.py:331)
    _process_tasks (airflow/executors/celery_executor.py:272)
    trigger_tasks (airflow/executors/celery_executor.py:263)
    heartbeat (airflow/executors/base_executor.py:158)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
    _execute (airflow/jobs/scheduler_job.py:1284)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

This is the py-spy dump of the child airflow scheduler process when it got stuck:

Python v3.8.7

Thread 16232 (idle): "MainThread"
    __enter__ (multiprocessing/synchronize.py:95)
    get (multiprocessing/queues.py:355)
    worker (multiprocessing/pool.py:114)
    run (multiprocessing/process.py:108)
    _bootstrap (multiprocessing/process.py:315)
    _launch (multiprocessing/popen_fork.py:75)
    __init__ (multiprocessing/popen_fork.py:19)
    _Popen (multiprocessing/context.py:277)
    start (multiprocessing/process.py:121)
    _repopulate_pool_static (multiprocessing/pool.py:326)
    _repopulate_pool (multiprocessing/pool.py:303)
    __init__ (multiprocessing/pool.py:212)
    Pool (multiprocessing/context.py:119)
    _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
    _process_tasks (airflow/executors/celery_executor.py:272)
    trigger_tasks (airflow/executors/celery_executor.py:263)
    heartbeat (airflow/executors/base_executor.py:158)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
    _execute (airflow/jobs/scheduler_job.py:1284)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)
sterling-jackson commented 3 years ago

Have been struggling with this since we migrated to 2.0 our lower environments. Scheduler works for a couple of days, then stops scheduling, but doesn't trigger any heartbeat errors. Not sure it's helpful, but our PROD instance is running smoothly with Airflow 1.10.9 and Python 3.7.8.

Restarting the scheduler brings it back to life after Docker restarts the service.

ashb commented 3 years ago

@sterling-jackson Your use case might be fixed by 2.1.0 (currently in RC stage)

yuqian90 commented 3 years ago

Hi @ashb @davidcaron I managed to reproduce this issue consistently with a small reproducing example and traced the problem down to reset_signals() in celery_executor.py. Since it feels like a different issue from the original one reported here, I opened a new issue: https://github.com/apache/airflow/issues/15938

thesuperzapper commented 2 years ago

I just wanted to share that the User-Community Airflow Helm Chart now has a mitigation for this issue that will automatically restart the scheduler if no tasks are created within some threshold time.

It's called the scheduler "Task Creation Check", but its not enabled by default as, the "threshold" must be longer than your shorted DAG schedule_interval, which we dont know unless the user tells us.