apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.04k stars 14.29k forks source link

Airflow Hybrid Executor have issue where tasks are rescheduled but actually running #42151

Closed iw-pavan closed 3 weeks ago

iw-pavan commented 2 months ago

Apache Airflow version

2.10.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

There is intermittent issue in hybrid executor where task is queued multiple times killing original execution and workflow runs goes into failed state.

From logs below it queues task to celery executor which is correct behaviour, but after few seconds there is log

default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:18.791+0000] {kubernetes_executor.py:273} INFO - TaskInstance: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [queued]> found in queued state but was not launched, rescheduling

Which seems wrong as executor for above task is CeleryExecutor but "clear_not_launched_queued_tasks" func was executed on kubernetes executor

default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:07.760+0000] {scheduler_job_runner.py:736} INFO - Trying to enqueue tasks: [<TaskInstance: 66cdf9e9b3453e031689fa09.PB_VK39 66e13e1a9b94be0771f7ff20 [scheduled]>, <TaskInstance: 66cee622d8d3b11ea97d23ea.PB_VK39 66e13eb79b94be0771f7ff92 [scheduled]>, <TaskInstance: 66cef62d21a3cc6d397f816d.SI_Y9VG 66e1407de57be85bc875c6ac [scheduled]>, <TaskInstance: 66cdfab5b3453e031689fbae.SI_MDXZ 66e1407de57be85bc875c6b0 [scheduled]>] for executor: CeleryExecutor(parallelism=200) default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:15.300+0000] {scheduler_job_runner.py:736} INFO - Trying to enqueue tasks: [<TaskInstance: 66cdfa3c407cee140c3ce50a.SI_XAFG 66e13f1de57be85bc875c52f [scheduled]>, <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]>, <TaskInstance: 66cdfab84750a813ec928598.SI_KXFL 66e14089e57be85bc875c6bd [scheduled]>] for executor: CeleryExecutor(parallelism=200) default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:15.301+0000] {scheduler_job_runner.py:680} INFO - Sending TaskInstanceKey(dag_id='66cdfab5b3453e031689fbae', task_id='PB_VK39', run_id='66e1407de57be85bc875c6b0', try_number=1, map_index=-1) to CeleryExecutor with priority 1 and queue default default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:15.301+0000] {base_executor.py:168} INFO - Adding to queue: ['airflow', 'tasks', 'run', '66cdfab5b3453e031689fbae', 'PB_VK39', '66e1407de57be85bc875c6b0', '--local', '--subdir', 'DAGS_FOLDER/66cdfab5b3453e031689fbae.py'] default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:16.921+0000] {scheduler_job_runner.py:764} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='66cdfab5b3453e031689fbae', task_id='PB_VK39', run_id='66e1407de57be85bc875c6b0', try_number=1, map_index=-1) default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:16.950+0000] {scheduler_job_runner.py:791} INFO - Setting external_id for <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [queued]> to 2f92c5d8-923d-43b7-811c-17b5986411d8 default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:18.791+0000] {kubernetes_executor.py:273} INFO - TaskInstance: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [queued]> found in queued state but was not launched, rescheduling default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:19.827+0000] {scheduler_job_runner.py:736} INFO - Trying to enqueue tasks: [<TaskInstance: 66cef56ed8d3b11ea97d269b.PB_VK39 66e13dede57be85bc875c3e1 [scheduled]>, <TaskInstance: 66cdfa3c407cee140c3ce50a.SI_XAFG 66e13f1de57be85bc875c52f [scheduled]>, <TaskInstance: 66cef631fe5e403e994538d8.PB_VK39 66e140649b94be0771f801f5 [scheduled]>, <TaskInstance: 66cefe18fe5e403e99453aee.SI_CUY5 66e1406fbbe1e2215bc5ef41 [scheduled]>, <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]>] for executor: CeleryExecutor(parallelism=200)

What you think should happen instead?

No response

How to reproduce

I am using HybridExecutor with Celery,Kubernetes executor.

Operating System

Linux

Versions of Apache Airflow Providers

Celery, Kubernetes

Deployment

Other Docker-based deployment

Deployment details

Kubernetes setup.

Anything else?

This task is not provided any executor so None is passed in executor value so First CeleryExecutor is used.

Are you willing to submit PR?

Code of Conduct

iw-pavan commented 2 months ago

Seems query in kubernetes executor to check not launched task need to be updated with executor

query = select(TaskInstance).where( TaskInstance.state == TaskInstanceState.QUEUED, TaskInstance.queued_by_job_id == self.job_id ) if self.kubernetes_queue: query = query.where(TaskInstance.queue == self.kubernetes_queue)