apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Worker never running tasks or failing them with no explanation for many simultaneous tasks #12995

Closed Squigilum closed 2 years ago

Squigilum commented 3 years ago

Apache Airflow version: 2.0.0rc1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.19.4

Environment:

What happened: I am running the 2.0.0 release candidate in minikube using the celery executor. It was installed using the helm chart in git, with the executor changed and a persistent volume claim for storing dags added. 'workers.replicas' is set to 2. I'm testing different scaling options by launching large amounts of tasks and evaluating how quickly/consistently they run. The DAG is run manually through the web server and on most runs, either some of the tasks will fail with no explanation or some tasks will be left in the 'queued' state and never run. The tasks in the 'queued' state are shown as 'active' in the flower dashboard but do not appear to be actually running.

As part of my testing I have increased the values of AIRFLOWCOREDAG_CONCURRENCY and AIRFLOWCELERYWORKER_CONCURRENCY. This seems like it might exacerbate the problem but I have reproduced it with the default settings.

What you expected to happen: All run successfully

What do you think went wrong? Initially I thought I was over-taxing the system, but resource monitoring has shown nothing indicating this. My system has 11Gb of RAM free and 4 CPUs, and CPU utilization never went over 30%.

How to reproduce it: Attached is a simple DAG that produces the issue on my setup. concurrent_workflow.zip

Anything else we need to know: I haven't seen anything indicating an error in the logs, but would be happy to provide if requested.

How often does this problem occur? Once? Every time etc? The majority of my runs (75-90%) have resulted in at between 1 and 4 tasks that are stuck in the 'queued' state. The failed tasks are less frequent (approximately 25%)

boring-cyborg[bot] commented 3 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

ashb commented 3 years ago

Check in the stdout/stderr of your celery worker pod -- you may find an error there about unable to connect to database.

Could you also check in the scheduler log files (not pod output) for a traceback -- I think in the logs/scheduler/latest/<DAG_FILE> kind of log.

ashb commented 3 years ago

The other thing to check, if you are on Postgres, is to increase the number of allowed connections, or use pgbouncer.

Squigilum commented 3 years ago

Thank you -- I won't be able to try these out until Monday but I'll let you know what I find.

Squigilum commented 3 years ago

I tried both increasing the allowed connections (from 100 to 250), and enabling pgbouncer, and both still had similar errors. For pgbouncer, I tried both enabling with the just default parameters and increasing the maxClientConn parameter in the chart. I think I tried 250 and 1000 for the maxClientConn parameter.

I'm attaching the worker logs for my two celery workers. For this run, tasks 23, 25, and 26 stayed in the queued state and never ran. 23 is active on worker 1 and 25 and 26 are on worker 0. I've also reduced (from 100 to 40) the amount of concurrent tasks considerably from the DAG I initially shared.

The only thing I noticed that looked abnormal to me is the following warning in the logs, but it does not seem to occur near the tasks in question:

[2020-12-14 18:52:40,054: WARNING/ForkPoolWorker-6] Failed to log action with (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq
(Background on this error at: http://sqlalche.me/e/13/4xp6)

I'm not sure if this snippet includes all the relevant log messages, but the worker logs for the tasks generally look like this:

[2020-12-14 18:48:37,531: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[5a07e65a-8ad0-4fbe-83c8-8ea952f3a55d]
[2020-12-14 18:48:37,532: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7ff0ecd8dc10> (args:('airflow.executors.celery_executor.execute_command', '5a07e65a-8ad0-4fbe-83c8-8ea952f3a55d', {'lang': 'py', 'task': 'airflow.executors.celery_executor.execute_command', 'id': '5a07e65a-8ad0-4fbe-83c8-8ea952f3a55d', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '5a07e65a-8ad0-4fbe-83c8-8ea952f3a55d', 'parent_id': None, 'argsrepr': "[['airflow', 'tasks', 'run', 'run_100_concurrent', '25', '2020-12-14T18:47:13.448236+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/concurrent_workflow.py']]", 'kwargsrepr': '{}', 'origin': 'gen148@airflow-scheduler-686f8b7b4-2vlrd', 'reply_to': '7a69ddc6-70f1-3417-ae71-92a691da626b', 'correlation_id': '5a07e65a-8ad0-4fbe-83c8-8ea952f3a55d', 'hostname': 'celery@airflow-worker-1', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [['airflow', 'tasks', 'run', 'run_100_concurrent', '25',... kwargs:{})
[2020-12-14 18:48:37,552: DEBUG/MainProcess] Task accepted: airflow.executors.celery_executor.execute_command[5a07e65a-8ad0-4fbe-83c8-8ea952f3a55d] pid:23
[2020-12-14 18:48:37,750: INFO/ForkPoolWorker-5] Executing command in Celery: ['airflow', 'tasks', 'run', 'run_100_concurrent', '25', '2020-12-14T18:47:13.448236+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/concurrent_workflow.py']
[2020-12-14 18:48:41,337: DEBUG/ForkPoolWorker-5] Calling callbacks: [<function default_action_log at 0x7ff0efbee5e0>]

worker-0.log worker-1.log scheduler.log

Squigilum commented 3 years ago

@ashb I was able to confirm that it was a database issue as you suspected. The warning I pasted above is indicative that it happened. I would suggest that it should be logged as an error rather than a warning, since the consequences were pretty severe, but otherwise I believe having a better postgres setup will fix it for me. Thanks again for your suggestions.

FloChehab commented 3 years ago

Hello, I am having exactly the same issue while migrating to 2.0.0 and by reading through the comments I don't really get what is a possible solution.

@Squigilum could you share a bit more information on how you solved this?

Squigilum commented 3 years ago

@FloChehab I was only evaluating the tool, so I never solved it. We ended up using a different product. Sorry that doesn't help you. From what I could tell I believe @ashb's suggestions from Dec. 11th should have fixed it but I'm not sure why they didn't for me.

FloChehab commented 3 years ago

Thanks for the fast feedback @Squigilum !

After heavy investigations we found that SQLalchemy connection pooling was causing the error (we already have pgbouncer in front of postgres).

So we set sql_alchemy_pool_enabled to False in airflow config and changed the celery_config_options in celery config to point to a file containing:

from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from sqlalchemy.pool import NullPool

CUSTOM_CELERY_CONFIG = {
    **DEFAULT_CELERY_CONFIG,
    "database_engine_options": {"poolclass": NullPool},
}

Everything seems to work fine now.

eladkal commented 2 years ago

This issue is reported against older version of Airflow. Please check with latest Airflow version.