apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.71k stars 14.21k forks source link

Airflow Scheduler restart results in ObjectDeletedError #30709

Closed adammarchewka closed 1 year ago

adammarchewka commented 1 year ago

Apache Airflow version

2.5.3

What happened

Airflow Scheduler started to have issue when being restarted (either manually or forcefuly) - some task instances are stuck in running/queued state after restart and Scheduler somehow misses reference to them (or fails to readopt them) resulting in critical error about TaskInstance missing.

Error requires manual intervention into airflow database (Setting stuck tasks manually to failed state)

What you think should happen instead

Scheduler should properly shutdown gracefuly in given time and properly restart afterward without raising ObjectDeletedError

How to reproduce

Restart airflow-scheduler/redeploy whole airflow while tasks are running (are being processed by Scheduler/Workers)

We encounter issue with every restart/redeploy. Not sure if reproducible outside our system

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.1.0 apache-airflow-providers-cncf-kubernetes==5.2.2 apache-airflow-providers-common-sql==1.3.4 apache-airflow-providers-docker==3.5.1 apache-airflow-providers-elasticsearch==4.4.0 apache-airflow-providers-ftp==3.3.1 apache-airflow-providers-google==9.0.0 apache-airflow-providers-grpc==3.1.0 apache-airflow-providers-hashicorp==3.3.0 apache-airflow-providers-http==4.2.0 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-mysql==4.0.2 apache-airflow-providers-odbc==3.2.1 apache-airflow-providers-postgres==5.4.0 apache-airflow-providers-redis==3.1.0 apache-airflow-providers-sendgrid==3.1.0 apache-airflow-providers-sftp==4.2.4 apache-airflow-providers-slack==7.2.0 apache-airflow-providers-snowflake==4.0.4 apache-airflow-providers-sqlite==3.3.1 apache-airflow-providers-ssh==3.5.0

Deployment

Other 3rd-party Helm chart

Deployment details

Kubernetes versions:

Client Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.3", GitCommit:"434bfd82814af038ad94d62ebe59b133fcb50506", GitTreeState:"clean", BuildDate:"2022-10-12T10:57:26Z", GoVersion:"go1.19.2", Compiler:"gc", Platform:"linux/amd64"}
Kustomize Version: v4.5.7
Server Version: version.Info{Major:"1", Minor:"24", GitVersion:"v1.24.10-gke.2300", GitCommit:"1d7ae0799b40b0cd95502e3a5e698db62572e341", GitTreeState:"clean", BuildDate:"2023-02-22T09:28:49Z", GoVersion:"go1.19.6 X:boringcrypto", Compiler:"gc", Platform:"linux/amd64"}
Helm version: 3.11.2-1

Deployment via: https://github.com/airflow-helm/charts

Anything else

Scheduler error log:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 108, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 73, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
    job.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 258, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 759, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 840, in _run_scheduler_loop
    self.adopt_or_reset_orphaned_tasks()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1413, in adopt_or_reset_orphaned_tasks
    for attempt in run_with_db_retries(logger=self.log):
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 347, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1458, in adopt_or_reset_orphaned_tasks
    to_reset = self.executor.try_adopt_task_instances(tis_to_reset_or_adopt)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/celery_executor.py", line 503, in try_adopt_task_instances
    if ti.external_executor_id is not None:
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 482, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 942, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 976, in _fire_loader_callables
    return callable_(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 561, in __call__
    return strategy._load_for_state(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 530, in _load_for_state
    raise orm_exc.ObjectDeletedError(state)
sqlalchemy.orm.exc.ObjectDeletedError: Instance '<TaskInstance at 0x7f29c778f130>' has been deleted, or its row is otherwise not present.

Custom Helm Values:

airflow:
  config:
    AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: "360"
    AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "60"
    AIRFLOW__CELERY__WORKER_CONCURRENCY: 5

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.