apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.41k stars 14.11k forks source link

AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'slots_occupied' #41525

Closed LipuFei closed 1 month ago

LipuFei commented 1 month ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.0 Python 3.12

What happened?

UPDATE: I think the CeleryKubernetesExecutor class should inherit BaseExecutor instead of LoggingMixin. BaseExecutor has the slots_occupied method implemented.

I get an error in the scheduler when I use CeleryKubernetesExecutor. My Airflow installation is on Kubernetes with the official Helm chart.

/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:143 FutureWarning: The config section [kubernetes] has been renamed to [kubernetes_executor]. Please update your `conf.get*` call to use the new name
[2024-08-16T11:55:08.527+0200] {executor_loader.py:254} INFO - Loaded executor: CeleryKubernetesExecutor
[2024-08-16T11:55:08.826+0200] {scheduler_job_runner.py:935} INFO - Starting the scheduler
[2024-08-16T11:55:08.827+0200] {scheduler_job_runner.py:942} INFO - Processing each file at most -1 times
[2024-08-16T11:55:08.828+0200] {ecs_executor.py:181} INFO - Loading Connection information
[2024-08-16T11:55:09.724+0200] {base.py:84} INFO - Retrieving connection 'aws_default'
[2024-08-16T11:55:11.322+0200] {executor_loader.py:254} INFO - Loaded executor: airflow.providers.amazon.aws.executors.ecs.AwsEcsExecutor
[2024-08-16T11:55:11.323+0200] {kubernetes_executor.py:287} INFO - Start Kubernetes executor
[2024-08-16T11:55:11.435+0200] {kubernetes_executor_utils.py:140} INFO - Event: and now my watch begins starting at resource_version: 0
[2024-08-16T11:55:11.623+0200] {kubernetes_executor.py:208} INFO - Found 0 queued task instances
[2024-08-16T11:55:11.625+0200] {ecs_executor.py:125} INFO - Starting ECS Executor and determining health...
[2024-08-16T11:55:12.119+0200] {ecs_executor.py:173} INFO - ECS Executor health check has succeeded.
[2024-08-16T11:55:12.120+0200] {scheduler_job_runner.py:1843} INFO - Adopting or resetting orphaned tasks for active dag runs
[2024-08-16T11:55:12.246+0200] {scheduler_job_runner.py:1001} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 984, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1121, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1264, in _do_scheduling
    num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 718, in _critical_section_enqueue_task_instances
    num_occupied_slots = sum([executor.slots_occupied for executor in self.job.executors])
                              ^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'CeleryKubernetesExecutor' object has no attribute 'slots_occupied'
[2024-08-16T11:55:12.330+0200] {kubernetes_executor.py:752} INFO - Shutting down Kubernetes executor
[2024-08-16T11:55:12.529+0200] {scheduler_job_runner.py:1014} INFO - Exited execute loop

What you think should happen instead?

No response

How to reproduce

Install Airflow with the official Helm chart with CeleryKubernetesExecutor and the scheduler container will raise this exception.

Operating System

Kubernetes, Linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

salam-abdul commented 1 month ago

I'm getting a related error message on Airflow 2.10 and Python 3.11 AttributeError: 'LocalKubernetesExecutor' object has no attribute 'slots_occupied'

potiuk commented 1 month ago

CC: @o-nikolas -> seems to be result of #40017

o-nikolas commented 1 month ago

I'll look into fixing this issue But I encourage folks to use Multiple Executor Configuration instead of these old (and should be deprecated) hybrid executors. These are exactly the types of issues that arise from these statically combined executors, since they're full of bespoke logic and hard coupling.

@potiuk what steps need to be followed for us to deprecate these old hybrid executors for airflow 3.0? Or is that not possible because they are in provider packages?

potiuk commented 1 month ago

@potiuk what steps need to be followed for us to deprecate these old hybrid executors for airflow 3.0? Or is that not possible because they are in provider packages

We can deprecate-with-warnings those and remove them from providers when the time comes (which might or might not be faster than getting rid of Airflow 2 support from those providers.

zoid-w commented 1 month ago

On a related note ; the current helm chart (1.15.0) does not yet support the hybrid executor approach introduced in airflow 2.10.0.

In the helm chart there is a constraint on an ENUM of the "old" executors. Not sure what the best approach would be to validate the executor input (if possible) considering the possibility to specify your own custom executor.

KulykDmytro commented 1 week ago

@potiuk issue still exists on

potiuk commented 1 week ago

Did you try with latest provider?

potiuk commented 1 week ago

cc: @o-nikolas

eladkal commented 1 week ago

@potiuk issue still exists on

  • Python 3.11
  • Airflow 2.10.1

please specify version of Celery provider that you are using