apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.26k stars 14.08k forks source link

Task policy is not setting for retries #41405

Open ruffury opened 1 month ago

ruffury commented 1 month ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.9.2

What happened?

We moved our cluster from KubernetesExecutor to CeleryKubernetesExecutor. We have a lot of dags with executor_config parameter. So, we have situation where we have a lot of dags with executor_config, which need KubernetesExecutor because it uses custom image. And we have rest of dags without this parameter, so they should execute on the workers.

To achieve this, we create simple cluster_policy to task level

from airflow.models.baseoperator import BaseOperator
from airflow.policies import hookimpl

@hookimpl
def task_policy(task: BaseOperator):
    use_kubernetes_if_config(task=task)

def use_kubernetes_if_config(task: BaseOperator):
    if task.executor_config:
        task.queue = 'kubernetes'

but we have problem - this policy is not working for retries. First run of task is executing in Kubernetes, but retries are executing on the celery and are failing because of lack of libraries (we need custom image there)

What you think should happen instead?

Retries of task should also execute in Kubernetes pod using kuberneted queue

How to reproduce

Apply cluster policy from description and dag with task with executor_config with custom image

Operating System

Debian GNU/Linux 12

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.19.0 apache-airflow-providers-celery==3.7.2 apache-airflow-providers-cncf-kubernetes==7.13.0 apache-airflow-providers-common-io==1.3.2 apache-airflow-providers-common-sql==1.14.0 apache-airflow-providers-docker==3.12.0 apache-airflow-providers-elasticsearch==5.4.1 apache-airflow-providers-fab==1.1.1 apache-airflow-providers-ftp==3.9.1 apache-airflow-providers-google==10.19.0 apache-airflow-providers-grpc==3.5.1 apache-airflow-providers-hashicorp==3.7.1 apache-airflow-providers-http==4.11.1 apache-airflow-providers-imap==3.6.1 apache-airflow-providers-microsoft-azure==10.1.1 apache-airflow-providers-mysql==5.6.1 apache-airflow-providers-odbc==4.6.1 apache-airflow-providers-openlineage==1.8.0 apache-airflow-providers-pagerduty==3.7.2 apache-airflow-providers-postgres==5.11.1 apache-airflow-providers-redis==3.7.1 apache-airflow-providers-sendgrid==3.5.1 apache-airflow-providers-sftp==4.10.1 apache-airflow-providers-slack==8.7.1 apache-airflow-providers-smtp==1.7.1 apache-airflow-providers-snowflake==5.5.1 apache-airflow-providers-sqlite==3.8.1 apache-airflow-providers-ssh==3.11.1 apache-airflow-providers-telegram==3.1.1 apache-airflow-providers-trino==5.7.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

Executor: CeleryKubernetesExecutor k8s_version = "1.27"

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

gopidesupavan commented 2 weeks ago

@RNHTTR going to look at this, bit takes time first time to kubernetes area in airflow, ill try crack it :).

RNHTTR commented 1 week ago

@RNHTTR going to look at this, bit takes time first time to kubernetes area in airflow, ill try crack it :).

Awesome! I've assigned you. I don't think this actually needs to be running on a Kubernetes cluster to reproduce. You could probably reproduce using a very simple cluster policy.