apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

randomly schedule failing when using celery executor and HA scheduler #23207

Closed Sha-of-Lazy closed 2 years ago

Sha-of-Lazy commented 2 years ago

Apache Airflow version

2.2.5 (latest released)

What happened

The DAG run randomly(about 5%) failed when scheduled. In the web UI it was like 'failed' and 'Not yet started'. While checking the logs, we find the error below in the scheduler's std output:

[2022-04-25 07:00:00,457] {scheduler_job.py:548} INFO - Sending TaskInstanceKey(dag_id='my_dag_id', task_id='my_dag_id_1', run_id='scheduled__2022-04-24T22:55:00+00:00', try_number=1) to executor with priority 1 and queue default [2022-04-25 07:00:00,457] {base_executor.py:85} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'my_dag_id', 'my_dag_id_1', 'scheduled2022-04-24T22:55:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/auto_generated_my_dag_id.py'] [2022-04-25 07:00:00,618] {celery_executor.py:299} ERROR - Error sending Celery task: This result object does not return rows. It has been closed automatically. Celery Task ID: TaskInstanceKey(dag_id='my_dag_id', task_id='my_dag_id_1', run_id='scheduled2022-04-24T22:55:00+00:00', try_number=1) Traceback (most recent call last): File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", line 779, in _getter getter = self._metadata._getter AttributeError: 'NoneType' object has no attribute '_getter' The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/python3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 172, in send_task_to_executor result = task_to_run.apply_async(args=[command], queue=queue) File "/usr/local/python3/lib/python3.6/site-packages/celery/app/task.py", line 576, in apply_async options File "/usr/local/python3/lib/python3.6/site-packages/celery/app/base.py", line 767, in send_task amqp.send_task_message(P, name, message, options) File "/usr/local/python3/lib/python3.6/site-packages/celery/app/amqp.py", line 519, in send_task_message *properties File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", line 180, in publish exchange_name, declare, timeout File "/usr/local/python3/lib/python3.6/site-packages/kombu/connection.py", line 525, in _ensured return fun(args, kwargs) File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", line 193, in _publish [maybe_declare(entity) for entity in declare] File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", line 193, in [maybe_declare(entity) for entity in declare] File "/usr/local/python3/lib/python3.6/site-packages/kombu/messaging.py", line 99, in maybe_declare return maybe_declare(entity, self.channel, retry, retry_policy) File "/usr/local/python3/lib/python3.6/site-packages/kombu/common.py", line 119, in maybe_declare return _maybe_declare(entity, channel) File "/usr/local/python3/lib/python3.6/site-packages/kombu/common.py", line 159, in _maybe_declare entity.declare(channel=channel) File "/usr/local/python3/lib/python3.6/site-packages/kombu/entity.py", line 606, in declare self._create_queue(nowait=nowait, channel=channel) File "/usr/local/python3/lib/python3.6/site-packages/kombu/entity.py", line 615, in _create_queue self.queue_declare(nowait=nowait, passive=False, channel=channel) File "/usr/local/python3/lib/python3.6/site-packages/kombu/entity.py", line 650, in queue_declare nowait=nowait, File "/usr/local/python3/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 529, in queue_declare return queue_declare_ok_t(queue, self._size(queue), 0) File "/usr/local/python3/lib/python3.6/site-packages/kombu/transport/sqlalchemy/init.py", line 196, in _size return self._query_all(queue).count() File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3803, in count return self.from_self(col).scalar() File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3523, in scalar ret = self.one() File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3490, in one ret = self.one_or_none() File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3459, in one_or_none ret = list(self) File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 100, in instances cursor.close() File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in exit with_traceback=exctb, File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise raise exception File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 60, in instances for query_entity in query._entities File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 60, in for query_entity in query._entities File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 4849, in row_processor getter = result._getter(column) File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", line 781, in _getter return self._non_result(None, err) File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", line 1241, in _non_result replacecontext=err, File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise raise exception sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically.

[2022-04-25 07:00:00,628] {scheduler_job.py:608} INFO - Executor reports execution of my_dag_id.my_dag_id_1 run_id=scheduled__2022-04-24T22:55:00+00:00 exited with status failed for try_number 1

and in some other cases it looks like:

Traceback (most recent call last): File "/usr/local/python3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 172, in send_task_to_executor result = task_to_run.apply_async(args=[command], queue=queue) * similar trace stacks***** File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", line 704, in _getter ret = self._key_fallback(key, None, raiseerr) File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/engine/result.py", line 686, in _key_fallback replacecontext=err, File "/usr/local/python3/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise raise exception sqlalchemy.exc.NoSuchColumnError: "Could not locate column in row for column 'count(*)'"

or:

"Could not locate column in row for column 'kombu_queue.id'"

Basicly it seems like a bug when celery executor associating with sqlachemy.

What you think should happen instead

the task should be scheduled successfully

How to reproduce

The error was happening continuously in my environment. I'm trying to supply some relevant configs(in my mind) to help reproduce: 12 nodes(docker), both running worker and scheduler at same time, connecting to one mysql8 database

executor = CeleryExecutor [celery] worker_concurrency = 4 broker_url, result_backend: the same sqlachemy connection to a mysql8 database pool = prefork worker_precheck = False

[scheduler] job_heartbeat_sec = 5 scheduler_heartbeat_sec = 5 num_runs = -1 parsing_processes = 2

Operating System

Fedora EL7

Versions of Apache Airflow Providers

apache-airflow==2.2.5 apache-airflow-providers-celery==2.1.0 apache-airflow-providers-ftp==2.1.0 apache-airflow-providers-http==2.1.0 apache-airflow-providers-imap==2.2.1 apache-airflow-providers-mysql==2.2.1 apache-airflow-providers-sqlite==2.1.1

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!