apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.95k stars 14.26k forks source link

StaleDataError when running DAG with dynamic task mapping #42006

Open ykanagarajapandiyan opened 1 month ago

ykanagarajapandiyan commented 1 month ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.9.1

What happened?

We are noticing our Scheduler and Executors going down randomly whenever its processing a DAG with Dynamic task mapping tasks in it. On logs, we see DB stale data error on task instance table when scheduler loop tries to process expanded task instances in dag. We use Local Kubernetes executor and logs show all (scheduler, Local executor and Kubernetes executors) going down

[2024-09-04T16:27:39.420+0000] {scheduler_job_runner.py:860} ERROR - Exception when executing SchedulerJob._run_scheduler_loop Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 843, in _execute self._run_scheduler_loop() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 975, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1057, in _do_scheduling callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/retries.py", line 89, in wrapped_function for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs): File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 347, in __iter__ do = self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter return fut.result() ^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/retries.py", line 98, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1401, in _schedule_all_dag_runs callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1401, in <listcomp> callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1469, in _schedule_dag_run schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 798, in update_state info = self.task_instance_scheduling_decisions(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 954, in task_instance_scheduling_decisions schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis( ^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 1063, in _get_ready_tis new_tis = _expand_mapped_task_if_needed(schedulable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 1037, in _expand_mapped_task_if_needed expanded_tis, _ = ti.task.expand_mapped_task(self.run_id, session=session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 608, in expand_mapped_task session.flush() File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj _emit_update_statements( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 1035, in _emit_update_statements raise orm_exc.StaleDataError( sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched. [2024-09-04T16:27:39.425+0000] {local_executor.py:403} INFO - Shutting down LocalExecutor; waiting for running tasks to finish. Signal again if you don't want to wait. [2024-09-04T16:27:39.703+0000] {kubernetes_executor.py:762} INFO - Shutting down Kubernetes executor [2024-09-04T16:27:39.853+0000] {scheduler_job_runner.py:872} INFO - Exited execute loop Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 8, in <module> sys.exit(main()) ^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 58, in main args.func(args) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 114, in wrapper [2024-09-04 16:27:39 +0000] [410] [INFO] Handling signal: term return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py", line 58, in scheduler run_command_with_daemon_option( File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option callback() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py", line 61, in <lambda> [2024-09-04 16:27:39 +0000] [417] [INFO] Worker exiting (pid: 417) [2024-09-04 16:27:39 +0000] [670] [INFO] Worker exiting (pid: 670) callback=lambda: _run_scheduler_job(args), ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job run_job(job=job_runner.job, execute_callable=job_runner._execute) File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 79, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 402, in run_job return execute_job(job, execute_callable=execute_callable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/job.py", line 431, in execute_job ret = execute_callable() ^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 843, in _execute self._run_scheduler_loop() File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 975, in _run_scheduler_loop num_queued_tis = self._do_scheduling(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1057, in _do_scheduling callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/retries.py", line 89, in wrapped_function for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs): File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 347, in __iter__ do = self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter return fut.result() ^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/retries.py", line 98, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1401, in _schedule_all_dag_runs callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1401, in <listcomp> callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1469, in _schedule_dag_run schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 798, in update_state info = self.task_instance_scheduling_decisions(session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 954, in task_instance_scheduling_decisions schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis( ^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 1063, in _get_ready_tis new_tis = _expand_mapped_task_if_needed(schedulable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 1037, in _expand_mapped_task_if_needed expanded_tis, _ = ti.task.expand_mapped_task(self.run_id, session=session) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 608, in expand_mapped_task session.flush() File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj _emit_update_statements( File "/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 1035, in _emit_update_statements raise orm_exc.StaleDataError( sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched. [2024-09-04 16:27:39 +0000] [410] [INFO] Shutting down: Master

What you think should happen instead?

DAGs with dynamic task mapping to proceed as expected and no restarts on scheudler or executors.

How to reproduce

Have a DAG with dynamic task mapping enabled creating 100+ tasks and run them in schedules of 30 mins.

Operating System

PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian

Versions of Apache Airflow Providers

apache-airflow 2.9.1 apache-airflow-providers-alibaba 2.7.3 apache-airflow-providers-amazon 8.20.0 apache-airflow-providers-celery 3.6.2 apache-airflow-providers-cncf-kubernetes 8.1.1 apache-airflow-providers-common-io 1.3.1 apache-airflow-providers-common-sql 1.12.0 apache-airflow-providers-databricks 6.3.0 apache-airflow-providers-docker 3.10.0 apache-airflow-providers-elasticsearch 5.3.4 apache-airflow-providers-fab 1.0.4 apache-airflow-providers-ftp 3.8.0 apache-airflow-providers-google 10.17.0 apache-airflow-providers-grpc 3.4.1 apache-airflow-providers-hashicorp 3.6.4 apache-airflow-providers-http 4.10.1 apache-airflow-providers-imap 3.5.0 apache-airflow-providers-jenkins 3.5.1 apache-airflow-providers-microsoft-azure 10.0.0 apache-airflow-providers-mysql 5.5.4 apache-airflow-providers-odbc 4.5.0 apache-airflow-providers-openlineage 1.7.0 apache-airflow-providers-oracle 3.9.2 apache-airflow-providers-postgres 5.10.2 apache-airflow-providers-redis 3.6.1 apache-airflow-providers-sendgrid 3.4.0 apache-airflow-providers-sftp 4.9.1 apache-airflow-providers-slack 8.6.2 apache-airflow-providers-smtp 1.6.1 apache-airflow-providers-snowflake 5.4.0 apache-airflow-providers-sqlite 3.7.1 apache-airflow-providers-ssh 3.10.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

Issue occurs once or twice a day.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

mro-dmoura commented 2 weeks ago

I'm experiencing the same issue. Using Celery Executor in EKS with PostgreSQL on RDS as metadata database. The dynamic generated tasks affect other tasks and DAGs, causing random task killed without logs: [2024-10-14, 05:14:48 CDT] {scheduler_job_runner.py:843} ERROR - Executor reports task instance <TaskInstance: taskname [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

And in the scheduler logs, the StaleDataError: sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched.

ykanagarajapandiyan commented 2 weeks ago

Setting param 'use-row-level-locking' to True prevents these DB errors. use-row-level-locking. Even though the doc recommends to set this True only when multiple schedulers are used, I think this conflicts with param schedule-after-task-execution where a mini scheduler runs and tries to update the same rows that scheduler is working on.

But changing this param(use-row-level-locking) to True affects the task start times. I have noticed that even when the task completes, scheduler sometimes takes about 5-10 mins to terminate the pod (we use Local Kubernetes executor) and start the next ones.

potiuk commented 2 weeks ago

Quite likely https://github.com/apache/airflow/pull/39745 from 2.9.2 should improve mini-scheduler behaviour.

I recommend to upgrade to latest Airflow version and see if you seill have similar issues.

ykanagarajapandiyan commented 1 week ago

Upgrading Airflow to latest (2.10.2) has resolved Stale data errors with single scheduler + configuration 'use-row-level-locking' set to False.