apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.35k stars 14.1k forks source link

Airflow not retrying Zombie even after detection #42135

Open kand617 opened 1 week ago

kand617 commented 1 week ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3

What happened?

We have upgraded to 2.8.3 and have been noticing a lot more zombie jobs. I have not upgraded to the latest version as I don't see anything in the change logs. An observation is that the worker pods that was working on the task is no longer present (likely scaled down).

AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD = 600 <-- Set via environment variable

config: 
    core:
      dag_file_processor_timeout: 360
      dagbag_import_timeout: 240
      check_slas: false
      default_task_execution_timeout: 3600
    api:
      auth_backends: airflow.api.auth.backend.basic_auth
    scheduler:
      dag_dir_list_interval: 120
      min_file_process_interval: 30
      parsing_processes: 20
      task_queued_timeout: 10800
      parsing_cleanup_interval: 1000

    celery:
      worker_concurrency: 10 
    logging:
      logging_level: DEBUG

Sample screenshot from the the UI.

image

After many moments still not stopped ![Uploading image.png…]()

Sample Pod

task = KubernetesPodOperator(
        namespace="airflow",
        task_id=f"""{dag_name}_{dag_env}_task""",
        name=f"""{dag_name}_{dag_env}_task""",
        image=ETL_DOCKER_IMAGE,
        image_pull_secrets=[k8s.V1LocalObjectReference("airflow-docker-secret")],
        cmds=["/workspace/golang_cli",  "--name",  "myapp"],
        is_delete_operator_pod=True,
        in_cluster=True,
        get_logs=True,
        image_pull_policy="Always",
        on_failure_callback=on_failure,
        container_resources=k8s.V1ResourceRequirements(
            limits={"memory": "2Gi", "cpu": "1000m"},
            requests={"memory": "2Gi", "cpu": "500m"},
        ),
        sla=timedelta(hours=2),
        retry_delay=timedelta(seconds=60),
        secrets=secrets,
        env_from=configmaps,
    )

What you think should happen instead?

I would have expected the dags to be up for retry. Even after waiting 4 hours its not working. Whats even more strange is that default_task_execution_timeout was set but not respected.

How to reproduce

I was unable to reproduce this. However i was unable reproduce a zombie dag too.

from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.utils.dates import days_ago

from airflow import DAG

with DAG(
    dag_id="sleep_dag",
    start_date=days_ago(1),
    schedule="@daily",
    default_args={"retries": 2},
):
    t1 = BashOperator(
        task_id="sleep_10_minutes",
        bash_command="sleep 1200",
    )

ideally this should have caused it to timeout as a zombie.

image

However it ran without any issues.

Operating System

K8

Versions of Apache Airflow Providers

FROM apache/airflow:2.8.3-python3.11

USER airflow

RUN pip install apache-airflow-providers-cncf-kubernetes==7.14.0

RUN pip install apache-airflow-providers-databricks==6.2.0

RUN pip install apache-airflow-providers-google==10.21.0

RUN pip install apache-airflow-providers-microsoft-mssql==3.6.1

RUN pip install datadog

RUN pip install 'apache-airflow[statsd]'

RUN pip install 'apache-airflow[otel]'

RUN pip install apache-airflow[jdbc]

RUN pip install apache-airflow-providers-mongo

RUN pip install apache-airflow-providers-sendgrid==3.6.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

Running on K8

#default values file : https://github.com/apache/airflow/blob/2.4.1/chart/values.yaml#L520
airflow:
  extraEnv: |
    - name: AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD
      value: '600'
  redis:
    safeToEvict: false
    tolerations: 
    - key: "airflowcore"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule" 
    nodeSelector:
      agentpool: airflowcore 
    password: ------
  # https://github.com/apache/airflow/blob/2.4.1/chart/values.yaml
  # Default airflow repository -- overrides all the specific images below
  defaultAirflowRepository: "sharedservicesacrnpd.azurecr.io/airflowcustomimage"
  # Default airflow tag to deploy
  defaultAirflowTag: "260978"
  createUserJob:
    useHelmHooks: false
  migrateDatabaseJob:
    useHelmHooks: false
  defaultNodeSelector:
    pool: airflowcore
  fernetKeySecretName: airflow-fernet-key  
  webserverSecretKeySecretName: airflow-webserver-key
  scheduler:
    resources:
      limits:
        cpu: 2000m
        memory: 1Gi
      requests:
        cpu: 1000m
        memory: 512Mi
    tolerations: 
    - key: "airflowcore"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"     
    nodeSelector:
      agentpool: airflowcore   
    replicas: 8    
  web:
    resources:
      limits:
        cpu: 500m
        memory: 1Gi
      requests:
        cpu: 250m
        memory: 256Mi
    tolerations: 
    - key: "airflowcore"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"     
    nodeSelector:
      agentpool: airflowcore      
  triggerer:
    resources:
      limits:
        cpu: 500m
        memory: 1Gi
      requests:
        cpu: 250m
        memory: 256Mi
    tolerations: 
    - key: "airflowcore"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"     
    nodeSelector:
      agentpool: airflowcore                 
  dagProcessor:
    enabled: true
    resources:
      # limits:
      #   cpu: 1000m
      #   memory: 2Gi
      requests:
        cpu: 2000m
        memory: 2Gi
    tolerations: 
    - key: "airflowcore"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"     
    nodeSelector:
      agentpool: airflowcore   
  statsd:  
    nodeSelector:
      agentpool: airflow        
  dags:
    persistence:
    # Enable persistent volume for storing dags
      enabled: true
    # Volume size for dags
      size: 50Gi
    # If using a custom storageClass, pass name here
      storageClassName:
    # access mode of the persistent volume
      accessMode: ReadWriteMany
    ## the name of an existing PVC to use
      existingClaim: airflow-dag-pvc
  logs:
    persistence:
    # Enable persistent volume for storing logs
      enabled: true
    # Volume size for logs
      size: 50Gi
    # If using a custom storageClass, pass name here
      storageClassName:
    ## the name of an existing PVC to use
      existingClaim: airflow-log-pvc    
  executor: "CeleryKubernetesExecutor"    
  workers:
    resources:
      requests:
        memory: 5Gi
    keda:
      enabled: true
      cooldownPeriod: 300
      minReplicaCount: 10
    persistence:
      enabled: false
    tolerations: 
    - key: "airflowcore"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"     
    nodeSelector:
      agentpool: airflowcore 
    safeToEvict: false   
  ingress:
    web:
      enabled: true
      annotations:
        nginx.ingress.kubernetes.io/rewrite-target: "/"
        nginx.ingress.kubernetes.io/ssl-redirect: "true"
      hosts:
        - name: airflow.------.com
          tls: 
            enabled: true
            secretName: "airflow-secret-ingress-tls"
      ingressClassName: "nginx"
  postgresql:
    enabled: false
  pgbouncer:
    enabled: false
  data:  
    metadataSecretName: airflow-postgres-secret 
  config: 
    core:
      dag_file_processor_timeout: 360
      dagbag_import_timeout: 240
      check_slas: false
      default_task_execution_timeout: 3600
    api:
      auth_backends: airflow.api.auth.backend.basic_auth
    scheduler:
      dag_dir_list_interval: 120
      min_file_process_interval: 30
      parsing_processes: 20
      task_queued_timeout: 10800
      parsing_cleanup_interval: 1000

    celery:
      worker_concurrency: 10 
    logging:
      logging_level: DEBUG
    metrics: 
      statsd_on: false
      statsd_port: 8125
      statsd_host: datadog-agent.observability      
secrettls:
  name: airflow-secret-ingress-tls
  data:
    tls.crt: ----
    tls.key: ----
secret:
  name: airflow-secret-storageacct
  data: 
    azurestorageaccountname: -----
    azurestorageaccountkey: ------
secretfernet:
  name: airflow-fernet-key
  data: 
    fernet-key: ---
acrsecret:
  name: airflow-docker-secret
  data: 
    username: ---
    password: ---
secretwebserverkey:
  name: airflow-webserver-key
  data:
    webserver-secret-key: ---
connectionsecret:
  name: airflow-postgres-secret
  data:
    connection: ---
storageClass: 
  name: airflowstorageclass
  provisioner: file.csi.azure.com

dagvolumes: 
  name: airflow-dag-pv
  size: 50Gi
  accessModes: ReadWriteMany
  reclaimPolicy: Retain
  volumeHandle: airflowdag
  shareName: airflowdag
  secretName: airflow-secret-storageacct

dagvolumeClaim:
  name: airflow-dag-pvc
  accessModes: ReadWriteMany
  size: 50Gi

storageClassLog: 
  name: airflowstorageclasslog
  provisioner: file.csi.azure.com

logvolumes: 
  name: airflow-log-pv
  size: 50Gi
  accessModes: ReadWriteMany
  reclaimPolicy: Retain
  volumeHandle: airflowlog
  shareName: airflowlog
  secretName: airflow-secret-storageacct

logvolumeClaim:
  name: airflow-log-pvc
  accessModes: ReadWriteMany
  size: 50Gi

Anything else?

The issue happens too often. We have about

  1. 350 DAGS
  2. Most of the DAGs are K8 Pod Operator, few GCP Operators.
  3. Airflow components run in the own nodepool
  4. K8Pod Operator pods are run in a different node pool.

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 week ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

vaibhavnsingh commented 6 days ago

+1

In our current setup, we are using Celery workers as Airflow workers, and we have applied a memory limit on these workers as per our DevOps guidelines. When a Celery worker exceeds its memory limit, it encounters an Out-Of-Memory (OOM) error and restarts. This behavior leads to tasks that were in a running state becoming zombie tasks, which the Airflow scheduler detects.

However, we have observed that despite the scheduler detecting these tasks as zombie tasks, Airflow does not mark them as failed.