apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.19k stars 14.32k forks source link

Airflow Kubernetes EFS ReadWriteMany volume mount not working if the pod count is around 100 #33445

Closed jayon-niravel closed 1 year ago

jayon-niravel commented 1 year ago

Official Helm Chart version

1.10.0 (latest released)

Apache Airflow version

2.6.0

Kubernetes Version

Client Version: v1.25.2 Kustomize Version: v4.5.7 Server Version: v1.23.17+16bcd69

Helm Chart configuration

fullnameOverride: ""

# Provide a name to substitute for the name of the chart
nameOverride: ""

# Max number of old replicasets to retain. Can be overridden by each deployment's revisionHistoryLimit
revisionHistoryLimit: ~

# User and group of airflow user
uid: 50000
gid: 0

# Default security context for airflow (deprecated, use `securityContexts` instead)
securityContext: {}
#  runAsUser: 50000
#  fsGroup: 0
#  runAsGroup: 0

# Detailed default security context for airflow deployments
securityContexts:
  pod:
    fsGroupChangePolicy: "OnRootMismatch"
  containers: {}

# Airflow home directory
# Used for mount paths
airflowHome: /opt/airflow

# Default airflow repository -- overridden by all the specific images below
defaultAirflowRepository: apache/airflow

# Default airflow tag to deploy
defaultAirflowTag: "2.6.2"

# Default airflow digest. If specified, it takes precedence over tag
defaultAirflowDigest: ~

# Airflow version (Used to make some decisions based on Airflow Version being deployed)
airflowVersion: "2.6.2"

# Images
images:
  airflow:
    repository: XXXX
    tag: XXX
    # Specifying digest takes precedence over tag.
    digest: ~
    pullPolicy: IfNotPresent
  useDefaultImageForMigration: false
  # timeout (in seconds) for airflow-migrations to complete
  migrationsWaitTimeout: 60
  pod_template:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  flower:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  statsd:
    repository: quay.io/prometheus/statsd-exporter
    tag: v0.22.8
    pullPolicy: IfNotPresent
  redis:
    repository: redis
    tag: 7-bullseye
    pullPolicy: IfNotPresent
  pgbouncer:
    repository: apache/airflow
    tag: airflow-pgbouncer-2023.02.24-1.16.1
    pullPolicy: IfNotPresent
  pgbouncerExporter:
    repository: apache/airflow
    tag: airflow-pgbouncer-exporter-2023.02.21-0.14.0
    pullPolicy: IfNotPresent
  gitSync:
    repository: registry.k8s.io/git-sync/git-sync
    tag: v3.6.3
    pullPolicy: IfNotPresent

# Select certain nodes for airflow pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []

# Add common labels to all objects and pods defined in this chart.
labels: {}

# Network policy configuration
networkPolicies:
  # Enabled network policies
  enabled: false

# Extra annotations to apply to all
# Airflow pods
airflowPodAnnotations: {}

# Extra annotations to apply to
# main Airflow configmap
airflowConfigAnnotations: {}

# Enable RBAC (default on most clusters these days)
rbac:
  # Specifies whether RBAC resources should be created
  create: true
  createSCCRoleBinding: false

# Airflow executor
# One of: LocalExecutor, LocalKubernetesExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor
executor: "KubernetesExecutor"

allowPodLaunching: true

# Environment variables for all airflow containers
env:
 - name:   AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT
   value: "500"
 - name:   AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT
   value: "360.0"
 - name:   AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE
   value: "-1"
 - name:   AIRFLOW__CORE__PARALLELISM
   value: "500"
 - name:   AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG
   value: "500"
 - name:   AIRFLOW__SCHEDULER__PARSING_PROCESSES
   value: "32"
 - name:   AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD
   value: "60"
 - name:   AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG   
   value: "500"
 - name:   AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT   
   value: "360"
 - name:   AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE   
   value: "25"
 - name:   AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL   
   value: "600"
 - name:   AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL   
   value: "600"
 - name:   AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP   
   value: "500"
 - name:   AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE   
   value: "500"
 - name:   AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD   
   value: "600"
 - name: parallel_test_count
   value: "50"
 - name: parallel_test_sleep
   value: "60" 

# Volumes for all airflow containers
volumes: []

# VolumeMounts for all airflow containers
volumeMounts: []

# Secrets for all airflow containers
secret: []
# - envName: ""
#   secretName: ""
#   secretKey: ""

# Enables selected built-in secrets that are set via environment variables by default.
# Those secrets are provided by the Helm Chart secrets by default but in some cases you
# might want to provide some of those variables with _CMD or _SECRET variable, and you should
# in this case disable setting of those variables by setting the relevant configuration to false.
enableBuiltInSecretEnvVars:
  AIRFLOW__CORE__FERNET_KEY: true
  # For Airflow <2.3, backward compatibility; moved to [database] in 2.3
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: true
  AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: true
  AIRFLOW_CONN_AIRFLOW_DB: true
  AIRFLOW__WEBSERVER__SECRET_KEY: true
  AIRFLOW__CELERY__CELERY_RESULT_BACKEND: true
  AIRFLOW__CELERY__RESULT_BACKEND: true
  AIRFLOW__CELERY__BROKER_URL: true
  AIRFLOW__ELASTICSEARCH__HOST: true
  AIRFLOW__ELASTICSEARCH__ELASTICSEARCH_HOST: true

extraConfigMaps: {}

data:

  # Otherwise pass connection values in
  metadataConnection:
    user: xxx
    pass: xxxxx
    protocol: postgresql
    host: xxxxx
    port: 5432
    db: xxxx
    sslmode: disable
  # resultBackendConnection defaults to the same database as metadataConnection

fernetKey: "XXX"
fernetKeySecretName: ~

# Flask secret key for Airflow Webserver: `[webserver] secret_key` in airflow.cfg
webserverSecretKey: "XX"
webserverSecretKeySecretName: ~

# Airflow Worker Config
workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow workers (templated).
  command: ~
  # Args to use when running Airflow workers (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }}

  # If the worker stops responding for 5 minutes (5*60s) kill the
  # worker and let Kubernetes restart it
  livenessProbe:
    enabled: true
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~

  # Update Strategy when worker is deployed as a StatefulSet
  updateStrategy: ~
  # Update Strategy when worker is deployed as a Deployment
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Detailed default security context for worker deployments for container and pod level
  securityContexts:
    pod: {}
    container: {}

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  # Allow KEDA autoscaling.
  # Persistence.enabled must be set to false to use KEDA.
  keda:
    enabled: false
    namespaceLabels: {}

    # How often KEDA polls the airflow DB to report new scale requests to the HPA
    pollingInterval: 5

    # How many seconds KEDA will wait before scaling to zero.
    # Note that HPA has a separate cooldown period for scale-downs
    cooldownPeriod: 30

    # Minimum number of workers created by keda
    minReplicaCount: 0

    # Maximum number of workers created by keda
    maxReplicaCount: 10

    # Specify HPA related options
    advanced: {}
    # horizontalPodAutoscalerConfig:
    #   behavior:
    #     scaleDown:
    #       stabilizationWindowSeconds: 300
    #       policies:
    #         - type: Percent
    #           value: 100
    #           periodSeconds: 15

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 100Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Execute init container to chown log directory.
    # This is currently only needed in kind, due to usage
    # of local-path provisioner.
    fixPermissions: false
    # Annotations to add to worker volumes
    annotations: {}
    # Detailed default security context for persistence for container level
    securityContexts:
      container: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Grace period for tasks to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 600

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into worker.
  # Note: If used with KubernetesExecutor, you are responsible for signaling sidecars to exit when the main
  # container finishes so Airflow can continue the worker shutdown process!
  extraContainers: []
  # Add additional init containers into workers.
  extraInitContainers: []

  # Mount additional volumes into worker. It can be templated like in the following example:
  #   extraVolumes:
  #     - name: my-templated-extra-volume
  #       secret:
  #          secretName: '{{ include "my_secret_template" . }}'
  #          defaultMode: 0640
  #          optional: true
  #
  #   extraVolumeMounts:
  #     - name: my-templated-extra-volume
  #       mountPath: "{{ .Values.my_custom_path }}"
  #       readOnly: true
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow worker pods.
  nodeSelector: {}
  priorityClassName: ~
  affinity: {}
  # default worker affinity is:
  #  podAntiAffinity:
  #    preferredDuringSchedulingIgnoredDuringExecution:
  #    - podAffinityTerm:
  #        labelSelector:
  #          matchLabels:
  #            component: worker
  #        topologyKey: kubernetes.io/hostname
  #      weight: 100
  tolerations: []
  topologySpreadConstraints: []
  # hostAliases to use in worker pods.
  # See:
  # https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/
  hostAliases: []
  # - ip: "127.0.0.2"
  #   hostnames:
  #   - "test.hostname.one"
  # - ip: "127.0.0.3"
  #   hostnames:
  #   - "test.hostname.two"

  # annotations for the worker resource
  annotations: {}

  podAnnotations: {}

  # Labels specific to workers objects and pods
  labels: {}

# Airflow scheduler settings
scheduler:
  #  hostAliases for the scheduler pod
  hostAliases: []
  #  - ip: "127.0.0.1"
  #    hostnames:
  #      - "foo.local"
  #  - ip: "10.1.2.3"
  #    hostnames:
  #      - "foo.remote"

  # If the scheduler stops heartbeating for 5 minutes (5*60s) kill the
  # scheduler and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~
  # Airflow 2.0 allows users to run multiple schedulers,
  # However this feature is only recommended for MySQL 8+ and Postgres
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running the Airflow scheduler (templated).
  command: ~
  # Args to use when running the Airflow scheduler (templated).
  args: ["bash", "-c", "exec airflow scheduler"]

  # Update Strategy when scheduler is deployed as a StatefulSet
  # (when using LocalExecutor and workers.persistence)
  updateStrategy: ~
  # Update Strategy when scheduler is deployed as a Deployment
  # (when not using LocalExecutor and workers.persistence)
  strategy: ~

  # When not set, the values defined in the global securityContext will be used
  # (deprecated, use `securityContexts` instead)
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Detailed default security context for scheduler deployments for container and pod level
  securityContexts:
    pod:
        fsGroupChangePolicy: "OnRootMismatch"
    container: {}

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to scheduler kubernetes service account.
    annotations: {}

  # Scheduler pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      # minAvailable and maxUnavailable are mutually exclusive
      maxUnavailable: 1
      # minAvailable: 1

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into scheduler.
  extraContainers: []
  # Add additional init containers into scheduler.
  extraInitContainers: []

  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow scheduler pods.
  nodeSelector: {}
  affinity: {}
  # default scheduler affinity is:
  #  podAntiAffinity:
  #    preferredDuringSchedulingIgnoredDuringExecution:
  #    - podAffinityTerm:
  #        labelSelector:
  #          matchLabels:
  #            component: scheduler
  #        topologyKey: kubernetes.io/hostname
  #      weight: 100
  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # annotations for scheduler deployment
  annotations: {}

  podAnnotations: {}

  # Labels specific to scheduler objects and pods
  labels: {}

  logGroomerSidecar:
    # Whether to deploy the Airflow scheduler log groomer sidecar.
    enabled: true
    # Command to use when running the Airflow scheduler log groomer sidecar (templated).
    command: ~
    # Args to use when running the Airflow scheduler log groomer sidecar (templated).
    args: ["bash", "/clean-logs"]
    # Number of days to retain logs
    retentionDays: 15
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi
    # Detailed default security context for logGroomerSidecar for container level
    securityContexts:
      container: {}

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []
    # Detailed default security context for waitForMigrations for container level
    securityContexts:
      container: {}

  env: []

# Airflow database migration job settings
migrateDatabaseJob:
  enabled: true
  # Limit the lifetime of the job object after it finished execution.
  ttlSecondsAfterFinished: 300
  # Command to use when running the migrate database job (templated).
  command: ~
  # Args to use when running the migrate database job (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "db upgrade" "upgradedb" }}

  # Annotations on the database migration pod
  annotations: {}
  # jobAnnotations are annotations on the database migration job
  jobAnnotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Detailed default security context for migrateDatabaseJob for container and pod level
  securityContexts:
    pod: {}
    container: {}

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to migrate database job kubernetes service account.
    annotations: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Launch additional containers into database migration job
  extraContainers: []

  # Mount additional volumes into database migration job. It can be templated like in the following example:
  #   extraVolumes:
  #     - name: my-templated-extra-volume
  #       secret:
  #          secretName: '{{ include "my_secret_template" . }}'
  #          defaultMode: 0640
  #          optional: true
  #
  #   extraVolumeMounts:
  #     - name: my-templated-extra-volume
  #       mountPath: "{{ .Values.my_custom_path }}"
  #       readOnly: true
  extraVolumes: []
  extraVolumeMounts: []

  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []
  # In case you need to disable the helm hooks that create the jobs after install.
  # Disable this if you are using ArgoCD for example
  useHelmHooks: true
  applyCustomEnv: true

# Airflow webserver settings
webserver:
  #  hostAliases for the webserver pod
  hostAliases: []
  #  - ip: "127.0.0.1"
  #    hostnames:
  #      - "foo.local"
  #  - ip: "10.1.2.3"
  #    hostnames:
  #      - "foo.remote"
  allowPodLogReading: true
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
    failureThreshold: 5
    periodSeconds: 10
    scheme: HTTP

  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
    failureThreshold: 5
    periodSeconds: 10
    scheme: HTTP

  # Number of webservers
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running the Airflow webserver (templated).
  command: ~
  # Args to use when running the Airflow webserver (templated).
  args: ["bash", "-c", "exec airflow webserver"]

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to webserver kubernetes service account.
    annotations: {}

  # Webserver pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      # minAvailable and maxUnavailable are mutually exclusive
      maxUnavailable: 1
      # minAvailable: 1

  # Allow overriding Update Strategy for Webserver
  strategy: ~

  # When not set, the values defined in the global securityContext will be used
  # (deprecated, use `securityContexts` instead)
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Detailed default security contexts for webserver deployments for container and pod level
  securityContexts:
    pod: {}
    container: {}

  # Additional network policies as needed (Deprecated - renamed to `webserver.networkPolicy.ingress.from`)
  extraNetworkPolicies: []
  networkPolicy:
    ingress:
      # Peers for webserver NetworkPolicy ingress
      from: []
      # Ports for webserver NetworkPolicy ingress (if `from` is set)
      ports:
        - port: "{{ .Values.ports.airflowUI }}"

  resources: {}
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  # Create initial user.
  defaultUser:
    enabled: true
    role: Admin
    username: admin
    email: admin@example.com
    firstName: admin
    lastName: user
    password: admin

  # Launch additional containers into webserver.
  extraContainers: []
  # Add additional init containers into webserver.
  extraInitContainers: []

  # Mount additional volumes into webserver. It can be templated like in the following example:
  #   extraVolumes:
  #     - name: my-templated-extra-volume
  #       secret:
  #          secretName: '{{ include "my_secret_template" . }}'
  #          defaultMode: 0640
  #          optional: true
  #
  #   extraVolumeMounts:
  #     - name: my-templated-extra-volume
  #       mountPath: "{{ .Values.my_custom_path }}"
  #       readOnly: true
  extraVolumes: []
  extraVolumeMounts: []

  # This string (can be templated) will be mounted into the Airflow Webserver
  # as a custom webserver_config.py. You can bake a webserver_config.py in to
  # your image instead or specify a configmap containing the
  # webserver_config.py.
  webserverConfig: ~
  # webserverConfig: |
  #   from airflow import configuration as conf

  #   # The SQLAlchemy connection string.
  #   SQLALCHEMY_DATABASE_URI = conf.get('database', 'SQL_ALCHEMY_CONN')

  #   # Flask-WTF flag for CSRF
  #   CSRF_ENABLED = True
  webserverConfigConfigMapName: ~

  service:
    type: ClusterIP
    ## service annotations
    annotations: {}
    ports:
      - name: airflow-ui
        port: "{{ .Values.ports.airflowUI }}"
    # To change the port used to access the webserver:
    # ports:
    #   - name: airflow-ui
    #     port: 80
    #     targetPort: airflow-ui
    # To only expose a sidecar, not the webserver directly:
    # ports:
    #   - name: only_sidecar
    #     port: 80
    #     targetPort: 8888
    # If you have a public IP, set NodePort to set an external port.
    # Service type must be 'NodePort':
    # ports:
    #   - name: airflow-ui
    #     port: 8080
    #     targetPort: 8080
    #     nodePort: 31151
    loadBalancerIP: ~
    ## Limit load balancer source ips to list of CIDRs
    # loadBalancerSourceRanges:
    #   - "10.123.0.0/16"
    loadBalancerSourceRanges: []

  # Select certain nodes for airflow webserver pods.
  nodeSelector: {}
  priorityClassName: ~
  affinity: {}
  # default webserver affinity is:
  #  podAntiAffinity:
  #    preferredDuringSchedulingIgnoredDuringExecution:
  #    - podAffinityTerm:
  #        labelSelector:
  #          matchLabels:
  #            component: webserver
  #        topologyKey: kubernetes.io/hostname
  #      weight: 100
  tolerations: []
  topologySpreadConstraints: []

  # annotations for webserver deployment
  annotations: {}

  podAnnotations: {}

  # Labels specific webserver app
  labels: {}

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []
    # Detailed default security context for waitForMigrations for container level
    securityContexts:
      container: {}

  env: []

# Airflow Triggerer Config
triggerer:
  enabled: true
  # Number of airflow triggerers in the deployment
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow triggerers (templated).
  command: ~
  # Args to use when running Airflow triggerer (templated).
  args: ["bash", "-c", "exec airflow triggerer"]

  # Update Strategy when triggerer is deployed as a StatefulSet
  updateStrategy: ~
  # Update Strategy when triggerer is deployed as a Deployment
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"

  # If the triggerer stops heartbeating for 5 minutes (5*60s) kill the
  # triggerer and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to triggerer kubernetes service account.
    annotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Detailed default security context for triggerer for container and pod level
  securityContexts:
    pod: {}
    container: {}
  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for triggerer StatefulSet
    size: 100Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Execute init container to chown log directory.
    # This is currently only needed in kind, due to usage
    # of local-path provisioner.
    fixPermissions: false
    # Annotations to add to triggerer volumes
    annotations: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Grace period for triggerer to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 60

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into triggerer.
  extraContainers: []
  # Add additional init containers into triggerers.
  extraInitContainers: []

  # Mount additional volumes into triggerer. It can be templated like in the following example:
  #   extraVolumes:
  #     - name: my-templated-extra-volume
  #       secret:
  #          secretName: '{{ include "my_secret_template" . }}'
  #          defaultMode: 0640
  #          optional: true
  #
  #   extraVolumeMounts:
  #     - name: my-templated-extra-volume
  #       mountPath: "{{ .Values.my_custom_path }}"
  #       readOnly: true
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow triggerer pods.
  nodeSelector: {}
  affinity: {}
  # default triggerer affinity is:
  #  podAntiAffinity:
  #    preferredDuringSchedulingIgnoredDuringExecution:
  #    - podAffinityTerm:
  #        labelSelector:
  #          matchLabels:
  #            component: triggerer
  #        topologyKey: kubernetes.io/hostname
  #      weight: 100
  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # annotations for the triggerer deployment
  annotations: {}

  podAnnotations: {}

  # Labels specific to triggerer objects and pods
  labels: {}

  logGroomerSidecar:
    # Whether to deploy the Airflow triggerer log groomer sidecar.
    enabled: true
    # Command to use when running the Airflow triggerer log groomer sidecar (templated).
    command: ~
    # Args to use when running the Airflow triggerer log groomer sidecar (templated).
    args: ["bash", "/clean-logs"]
    # Number of days to retain logs
    retentionDays: 15
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi
    # Detailed default security context for logGroomerSidecar for container level
    securityContexts:
      container: {}

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []
    # Detailed default security context for waitForMigrations for container level
    securityContexts:
      container: {}

  env: []

# Airflow Dag Processor Config
dagProcessor:
  enabled: false
  # Number of airflow dag processors in the deployment
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow dag processors (templated).
  command: ~
  # Args to use when running Airflow dag processor (templated).
  args: ["bash", "-c", "exec airflow dag-processor"]

  # Update Strategy for dag processors
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"

  # If the dag processor stops heartbeating for 5 minutes (5*60s) kill the
  # dag processor and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to dag processor kubernetes service account.
    annotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Detailed default security context for dagProcessor for container and pod level
  securityContexts:
    pod: {}
    container: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Grace period for dag processor to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 60

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into dag processor.
  extraContainers: []
  # Add additional init containers into dag processors.
  extraInitContainers: []

  # Mount additional volumes into dag processor. It can be templated like in the following example:
  #   extraVolumes:
  #     - name: my-templated-extra-volume
  #       secret:
  #          secretName: '{{ include "my_secret_template" . }}'
  #          defaultMode: 0640
  #          optional: true
  #
  #   extraVolumeMounts:
  #     - name: my-templated-extra-volume
  #       mountPath: "{{ .Values.my_custom_path }}"
  #       readOnly: true
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow dag processor pods.
  nodeSelector: {}
  affinity: {}
  # default dag processor affinity is:
  #  podAntiAffinity:
  #    preferredDuringSchedulingIgnoredDuringExecution:
  #    - podAffinityTerm:
  #        labelSelector:
  #          matchLabels:
  #            component: dag-processor
  #        topologyKey: kubernetes.io/hostname
  #      weight: 100
  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # annotations for the dag processor deployment
  annotations: {}

  podAnnotations: {}

  logGroomerSidecar:
    # Whether to deploy the Airflow dag processor log groomer sidecar.
    enabled: true
    # Command to use when running the Airflow dag processor log groomer sidecar (templated).
    command: ~
    # Args to use when running the Airflow dag processor log groomer sidecar (templated).
    args: ["bash", "/clean-logs"]
    # Number of days to retain logs
    retentionDays: 15
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []

  env: []

# PgBouncer settings
pgbouncer:
  # Enable PgBouncer
  enabled: true
  # Number of PgBouncer replicas to run in Deployment
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~
  # Command to use for PgBouncer(templated).
  command: ["pgbouncer", "-u", "nobody", "/etc/pgbouncer/pgbouncer.ini"]
  # Args to use for PgBouncer(templated).
  args: ~
  auth_type: md5
  auth_file: /etc/pgbouncer/users.txt

  # annotations to be added to the PgBouncer deployment
  annotations: {}

  podAnnotations: {}

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  # Additional network policies as needed
  extraNetworkPolicies: []

  # Pool sizes
  metadataPoolSize: 10
  resultBackendPoolSize: 5

  # Maximum clients that can connect to PgBouncer (higher = more file descriptors)
  maxClientConn: 100

  # supply the name of existing secret with pgbouncer.ini and users.txt defined
  # you can load them to a k8s secret like the one below
  #  apiVersion: v1
  #  kind: Secret
  #  metadata:
  #    name: pgbouncer-config-secret
  #  data:
  #     pgbouncer.ini: <base64_encoded pgbouncer.ini file content>
  #     users.txt: <base64_encoded users.txt file content>
  #  type: Opaque
  #
  #  configSecretName: pgbouncer-config-secret
  #
  configSecretName: ~

  # PgBouncer pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      # minAvailable and maxUnavailable are mutually exclusive
      maxUnavailable: 1
      # minAvailable: 1

  # Limit the resources to PgBouncer.
  # When you specify the resource request the k8s scheduler uses this information to decide which node to
  # place the Pod on. When you specify a resource limit for a Container, the kubelet enforces those limits so
  # that the running container is not allowed to use more of that resource than the limit you set.
  # See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
  # Example:
  #
  # resource:
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi
  resources: {}

  service:
    extraAnnotations: {}

  # https://www.pgbouncer.org/config.html
  verbose: 0
  logDisconnections: 0
  logConnections: 0

  sslmode: "prefer"
  ciphers: "normal"

  ssl:
    ca: ~
    cert: ~
    key: ~

  # Add extra PgBouncer ini configuration in the databases section:
  # https://www.pgbouncer.org/config.html#section-databases
  extraIniMetadata: ~
  extraIniResultBackend: ~
  # Add extra general PgBouncer ini configuration: https://www.pgbouncer.org/config.html
  extraIni: ~

  # Mount additional volumes into pgbouncer. It can be templated like in the following example:
  #   extraVolumes:
  #     - name: my-templated-extra-volume
  #       secret:
  #          secretName: '{{ include "my_secret_template" . }}'
  #          defaultMode: 0640
  #          optional: true
  #
  #   extraVolumeMounts:
  #     - name: my-templated-extra-volume
  #       mountPath: "{{ .Values.my_custom_path }}"
  #       readOnly: true
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for PgBouncer pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  uid: 65534

  # Detailed default security context for pgbouncer for container level
  securityContexts:
    container: {}

  metricsExporterSidecar:
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi
    sslmode: "disable"

    # Detailed default security context for metricsExporterSidecar for container level
    securityContexts:
      container: {}

    livenessProbe:
      initialDelaySeconds: 10
      periodSeconds: 10
      timeoutSeconds: 1

    readinessProbe:
      initialDelaySeconds: 10
      periodSeconds: 10
      timeoutSeconds: 1

# Configuration for the redis provisioned by the chart
redis:
  enabled: true
  terminationGracePeriodSeconds: 600

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 1Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Annotations to add to redis volumes
    annotations: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # If set use as redis secret. Make sure to also set data.brokerUrlSecretName value.
  passwordSecretName: ~

  # Else, if password is set, create secret with it,
  # Otherwise a new password will be generated on install
  # Note: password can only be set during install, not upgrade.
  password: ~

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Select certain nodes for redis pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  # Set to 0 for backwards-compatiblity
  uid: 0
  # If not set, `redis.uid` will be used
  securityContext: {}
  #  runAsUser: 999
  #  runAsGroup: 0

  # Detailed default security context for redis for container and pod level
  securityContexts:
    pod: {}
    container: {}

  podAnnotations: {}
# Auth secret for a private registry
# This is used if pulling airflow images from a private registry
registry:
  secretName: ~

  # Example:
  connection:
   user: xxxx
   pass: xxx
   host: xxx
   email: xxxx
  #connection: {}

# All ports used by chart
ports:
  flowerUI: 5555
  airflowUI: 8080
  workerLogs: 8793
  triggererLogs: 8794
  redisDB: 6379
  statsdIngest: 9125
  statsdScrape: 9102
  pgbouncer: 6543
  pgbouncerScrape: 9127

# Define any ResourceQuotas for namespace
quotas: {}

# Define default/max/min values for pods and containers in namespace
limits: []

# This runs as a CronJob to cleanup old pods.
cleanup:
  enabled: false
  # Run every 15 minutes
  schedule: "*/15 * * * *"
  # Command to use when running the cleanup cronjob (templated).
  command: ~
  # Args to use when running the cleanup cronjob (templated).
  args: ["bash", "-c", "exec airflow kubernetes cleanup-pods --namespace={{ .Release.Namespace }}"]

  # jobAnnotations are annotations on the cleanup CronJob
  jobAnnotations: {}

  # Select certain nodes for airflow cleanup pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  podAnnotations: {}

  # Labels specific to cleanup objects and pods
  labels: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: false
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: "airflow-helm"

    # Annotations to add to cleanup cronjob kubernetes service account.
    annotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  runAsGroup: 0
  env: []

  # Detailed default security context for cleanup for container level
  securityContexts:
    container: {}

  # Specify history limit
  # When set, overwrite the default k8s number of successful and failed CronJob executions that are saved.
  failedJobsHistoryLimit: ~
  successfulJobsHistoryLimit: ~

# Configuration for postgresql subchart
# Not recommended for production
postgresql:
  enabled: false
  image:
    tag: "11"
  auth:
    enablePostgresUser: true
    postgresPassword: postgres
    username: ""
    password: ""

# Config settings to go into the mounted airflow.cfg
#
# Please note that these values are passed through the `tpl` function, so are
# all subject to being rendered as go templates. If you need to include a
# literal `{{` in a value, it must be expressed like this:
#
#    a: '{{ "{{ not a template }}" }}'
#
# Do not set config containing secrets via plain text values, use Env Var or k8s secret object
# yamllint disable rule:line-length
config:
  core:
    dags_folder: '{{ include "airflow_dags" . }}'
    # This is ignored when used with the official Docker image
    load_examples: 'False'
    executor: '{{ .Values.executor }}'
    # For Airflow 1.10, backward compatibility; moved to [logging] in 2.0
    colored_console_log: 'False'
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
  logging:
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
    colored_console_log: 'False'
  metrics:
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
  webserver:
    enable_proxy_fix: 'True'
    # For Airflow 1.10
    rbac: 'True'
  celery:
    flower_url_prefix: '{{ .Values.ingress.flower.path }}'
    worker_concurrency: 16
  scheduler:
    standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}'
    # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
    # `run_duration` included for Airflow 1.10 backward compatibility; removed in 2.0.
    run_duration: 41460
  elasticsearch:
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
  elasticsearch_configs:
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'
  kerberos:
    keytab: '{{ .Values.kerberos.keytabPath }}'
    reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
    principal: '{{ .Values.kerberos.principal }}'
    ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
  celery_kubernetes_executor:
    kubernetes_queue: 'kubernetes'
  # The `kubernetes` section is deprecated in Airflow >= 2.5.0 due to an airflow.cfg schema change.
  # The `kubernetes` section can be removed once the helm chart no longer supports Airflow < 2.5.0.
  kubernetes:
    namespace: '{{ .Release.Namespace }}'
    # The following `airflow_` entries are for Airflow 1, and can be removed when it is no longer supported.
    airflow_configmap: '{{ include "airflow_config" . }}'
    airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
    pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml'
    worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
    worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
    multi_namespace_mode: '{{ ternary "True" "False" .Values.multiNamespaceMode }}'
  # The `kubernetes_executor` section duplicates the `kubernetes` section in Airflow >= 2.5.0 due to an airflow.cfg schema change.
  kubernetes_executor:
    namespace: '{{ .Release.Namespace }}'
    pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml'
    worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
    worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
    multi_namespace_mode: '{{ ternary "True" "False" .Values.multiNamespaceMode }}'
# yamllint enable rule:line-length

# Whether Airflow can launch workers and/or pods in multiple namespaces
# If true, it creates ClusterRole/ClusterRolebinding (with access to entire cluster)
multiNamespaceMode: false

# `podTemplate` is a templated string containing the contents of `pod_template_file.yaml` used for
# KubernetesExecutor workers. The default `podTemplate` will use normal `workers` configuration parameters
# (e.g. `workers.resources`). As such, you normally won't need to override this directly, however,
# you can still provide a completely custom `pod_template_file.yaml` if desired.
# If not set, a default one is created using `files/pod-template-file.kubernetes-helm-yaml`.
podTemplate: ~
# The following example is NOT functional, but meant to be illustrative of how you can provide a custom
# `pod_template_file`. You're better off starting with the default in
# `files/pod-template-file.kubernetes-helm-yaml` and modifying from there.
# We will set `priorityClassName` in this example:
# podTemplate: |
#   apiVersion: v1
#   kind: Pod
#   metadata:
#     name: placeholder-name
#     labels:
#       tier: airflow
#         component: worker
#       release: {{ .Release.Name }}
#   spec:
#     priorityClassName: high-priority
#     containers:
#       - name: base
#         ...

# Git sync
dags:
  persistence:
    # Annotations for dags PVC
    annotations: {}
    # Enable persistent volume for storing dags
    enabled: false
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName:
    # access mode of the persistent volume
    accessMode: ReadWriteOnce
    ## the name of an existing PVC to use
    existingClaim:
    ## optional subpath for dag volume mount
    subPath: ~
  gitSync:
    enabled: false

    # git repo clone url
    # ssh example: git@github.com:apache/airflow.git
    # https example: https://github.com/apache/airflow.git
    repo: https://github.com/apache/airflow.git
    branch: v2-2-stable
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: "tests/dags"
    # if your repo needs a user name password
    # you can load them to a k8s secret like the one below
    #   ---
    #   apiVersion: v1
    #   kind: Secret
    #   metadata:
    #     name: git-credentials
    #   data:
    #     GIT_SYNC_USERNAME: <base64_encoded_git_username>
    #     GIT_SYNC_PASSWORD: <base64_encoded_git_password>
    # and specify the name of the secret below
    #
    # credentialsSecret: git-credentials
    #
    #
    # If you are using an ssh clone url, you can load
    # the ssh private key to a k8s secret like the one below
    #   ---
    #   apiVersion: v1
    #   kind: Secret
    #   metadata:
    #     name: airflow-ssh-secret
    #   data:
    #     # key needs to be gitSshKey
    #     gitSshKey: <base64_encoded_data>
    # and specify the name of the secret below
    # sshKeySecret: airflow-ssh-secret
    #
    # If you are using an ssh private key, you can additionally
    # specify the content of your known_hosts file, example:
    #
    # knownHosts: |
    #    <host1>,<ip1> <key1>
    #    <host2>,<ip2> <key2>

    # interval between git sync attempts in seconds
    # high values are more likely to cause DAGs to become out of sync between different components
    # low values cause more traffic to the remote git repository
    wait: 5
    containerName: git-sync
    uid: 65533

    # When not set, the values defined in the global securityContext will be used
    securityContext: {}
    #  runAsUser: 65533
    #  runAsGroup: 0

    securityContexts:
      container: {}

    # Mount additional volumes into git-sync. It can be templated like in the following example:
    #   extraVolumeMounts:
    #     - name: my-templated-extra-volume
    #       mountPath: "{{ .Values.my_custom_path }}"
    #       readOnly: true
    extraVolumeMounts: []
    env: []
    # Supported env vars for gitsync can be found at https://github.com/kubernetes/git-sync
    # - name: ""
    #   value: ""

    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi

logs:
  persistence:
    # Enable persistent volume for storing logs
    enabled: true
    # Volume size for logs
    size: 14Gi
    # Annotations for the logs PVC
    annotations: {}
    # If using a custom storageClass, pass name here
    storageClassName: "efs-sc"
    ## the name of an existing PVC to use
    existingClaim: "airflow-logs"

Docker Image customizations

FROM apache/airflow:2.6.0

USER root

RUN apt-get update \
    && apt-get install unzip

#Copy dag files
COPY ./dags/ /opt/airflow/dags/

USER airflow

What happened

Dags/test_parallelism.py

If I run below DAG with AWS EFS volume mounted then it works for pod counts until 25 with no issues. But if I increase the pods count to 100 then I start getting the timeout issues.

Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[logs config backups kube-api-access-jxz9w]: timed out waiting for the condition

Unable to attach or mount volumes: unmounted volumes=[logs], unattached volumes=[backups kube-api-access-q6b8x logs config]: timed out waiting for the condition

Dags/test_parallelism.py

import time
import logging
import os
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from kubernetes.client import models as k8s

def test(**context):
    """
    Tests whether the volume has been mounted.
    """
    time.sleep(int(os.environ["parallel_test_sleep"]))

default_args = {
    "owner": 'Airflow',
    "start_date": datetime(2021, 1, 1),
}

dag = DAG(
    dag_id='test_1000_task_1',
    schedule_interval="0 * * * *",
    default_args=default_args,
    catchup=False
)

with dag:
    for i in range(int(os.environ["parallel_test_count"])):
        task = PythonOperator(
            task_id=f"task_{i}",
            python_callable=test,
            provide_context=True,
            executor_config={
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="base",
                                volume_mounts=[
                                    k8s.V1VolumeMount(
                                        mount_path="/opt/airflow/backups/", name="backups", read_only=False
                                    )
                                ],
                            )
                        ],
                        volumes=[
                            k8s.V1Volume(
                                name="backups",
                                persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-s3-pvc"),
                            )
                        ],
                    )
                ),
            }
        )

What you think should happen instead

The EFS volume should be mounted for the Kubernetes pods since the access mode is set to ReadWriteMany

How to reproduce

Use the mentioned helm chart/airflow version and custom-values.yaml template. Mount AWS EFS volume for persistent logs and also one custom volume to all the kubernetes pod.

Volume mount 1- airflow-logs

logs:
  persistence:
    # Enable persistent volume for storing logs
    enabled: true
    # Volume size for logs
    size: 14Gi
    # Annotations for the logs PVC
    annotations: {}
    # If using a custom storageClass, pass name here
    storageClassName: "efs-sc"
    ## the name of an existing PVC to use
    existingClaim: "airflow-logs"

Volume mount 2- backups via the POD Override

with dag:
    for i in range(int(os.environ["parallel_test_count"])):
        task = PythonOperator(
            task_id=f"task_{i}",
            python_callable=test,
            provide_context=True,
            executor_config={
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="base",
                                volume_mounts=[
                                    k8s.V1VolumeMount(
                                        mount_path="/opt/airflow/backups/", name="backups", read_only=False
                                    )
                                ],
                            )
                        ],
                        volumes=[
                            k8s.V1Volume(
                                name="backups",
                                persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-s3-pvc"),
                            )
                        ],
                    )
                ),
            }
        )

Anything else

kubectl logs for one of unmounted pods:


Name:         test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c
Namespace:    sb-jniravel
Priority:     0
Node:         ip-10-0-133-146.ec2.internal/10.0.133.146
Start Time:   Wed, 16 Aug 2023 09:21:57 -0500
Labels:       airflow-worker=1188
              airflow_version=2.6.0
              component=worker
              dag_id=test_1000_task_1
              kubernetes_executor=True
              release=airflow
              run_id=manual__2023-08-16T142155.7297460000-c3a08be2d
              task_id=task_44
              tier=airflow
              try_number=1
Annotations:  dag_id: test_1000_task_1
              openshift.io/scc: airflow-cluster-scc
              run_id: manual__2023-08-16T14:21:55.729746+00:00
              seccomp.security.alpha.kubernetes.io/pod: runtime/default
              task_id: task_44
              try_number: 1
Status:       Pending
IP:
IPs:          <none>
Containers:
  base:
    Container ID:
    Image:         truu.jfrog.io/airflow-etl-repo/airflow:v37
    Image ID:
    Port:          <none>
    Host Port:     <none>
    Args:
      airflow
      tasks
      run
      test_1000_task_1
      task_44
      manual__2023-08-16T14:21:55.729746+00:00
      --local
      --subdir
      DAGS_FOLDER/test_parallelism.py
    State:          Waiting
      Reason:       ContainerCreating
    Ready:          False
    Restart Count:  0
    Environment:
      AIRFLOW__CORE__EXECUTOR:                                                                                   LocalExecutor
      AIRFLOW__CORE__FERNET_KEY:                                                                                 <set to the key 'fernet-key' in secret 'airflow-fernet-key'>                      Optional: false
      AIRFLOW__CORE__SQL_ALCHEMY_CONN:                                                                           <set to the key 'connection' in secret 'airflow-airflow-metadata'>                Optional: false
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN:                                                                       <set to the key 'connection' in secret 'airflow-airflow-metadata'>                Optional: false
      AIRFLOW_CONN_AIRFLOW_DB:                                                                                   <set to the key 'connection' in secret 'airflow-airflow-metadata'>                Optional: false
      AIRFLOW__WEBSERVER__SECRET_KEY:                                                                            <set to the key 'webserver-secret-key' in secret 'airflow-webserver-secret-key'>  Optional: false
      AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT:                                                               500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT:                    500
      AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT:                                                                      360.0
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT:                           360.0
      AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE:                                                                  -1
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE:                       -1
      AIRFLOW__CORE__PARALLELISM:                                                                                500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__PARALLELISM:                                     500
      AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG:                                                                   500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG:                        500
      AIRFLOW__SCHEDULER__PARSING_PROCESSES:                                                                     32
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__PARSING_PROCESSES:                          32
      AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD:                                                      60
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD:           60
      AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG:                                                                    500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG:                         500
      AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT:                                                                 360
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT:                      360
      AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE:                                             25
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE:  25
      AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL:                                                             600
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL:                  600
      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL:                                                                 600
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL:                      600
      AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP:                                                        500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP:             500
      AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE:                                                      500
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE:           500
      AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD:                                                       600
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD:            600
      parallel_test_count:                                                                                       50
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_count:                                            50
      parallel_test_sleep:                                                                                       60
      AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__parallel_test_sleep:                                            60
      AIRFLOW_IS_K8S_EXECUTOR_POD:                                                                               True
    Mounts:
      /opt/airflow/airflow.cfg from config (ro,path="airflow.cfg")
      /opt/airflow/backups/ from backups (rw)
      /opt/airflow/config/airflow_local_settings.py from config (ro,path="airflow_local_settings.py")
      /opt/airflow/logs from logs (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-k76b5 (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  logs:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-logs
    ReadOnly:   false
  config:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      airflow-airflow-config
    Optional:  false
  backups:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-s3-pvc
    ReadOnly:   false
  kube-api-access-k76b5:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
    ConfigMapName:           openshift-service-ca.crt
    ConfigMapOptional:       <nil>
QoS Class:                   BestEffort
Node-Selectors:              <none>
Tolerations:                 node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                             node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type     Reason       Age        From               Message
  ----     ------       ----       ----               -------
  Normal   Scheduled    111s       default-scheduler  Successfully assigned sb-jniravel/test-1000-task-1-task-44-ff046add566c46bdb78ead1aa72d4e6c to ip-10-0-133-146.ec2.internal
  Warning  FailedMount  <invalid>  kubelet            Unable to attach or mount volumes: unmounted volumes=[logs backups], unattached volumes=[kube-api-access-k76b5 logs config backups]: timed out waiting for the condition

Are you willing to submit PR?

Code of Conduct

potiuk commented 1 year ago

This sounds like EFS/EKS problem, not airflow. I think you should look for similar issues (I saw there are plenty of airflow-unrelated issues raised with EFS/EKS and other apps having similar issues. From a quick look it's likely a configuration of networking or EFS resources that need to be able to handle that many mounts, but I think it's best if you look for similar issues or raise the issue to AWS support.

Converting to discussion, in case more discussion is needed as it seems a deployment-specific troubleshooting, not airflow issue.

BTW. Comment for the future It would be great to explain the difference you have in your configuration vs. standard or maybe add your specific code in the [collapsible section of markdown])https://gist.github.com/pierrejoubert73/902cc94d79424356a8d20be2b382e1ab) - the more easy you make for someone who tries to help you to understand your issue, the better (remember people here try to help when they can in their free time) - so by making it easier to see what the problem is, you increase your chances that someon will help you to solve your problem, In this case it took me quite some time time scroll many pages of the configuration (which maybe was useful but impossible to analyse in full by human) in order to find what the problem is.