apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.38k stars 14.1k forks source link

ValueError: Unable to configure handler 'task' #30863

Closed ahmadfarhan97 closed 1 year ago

ahmadfarhan97 commented 1 year ago

Official Helm Chart version

1.9.0 (latest released)

Apache Airflow version

2.5.3

Kubernetes Version

1.26.3

Helm Chart configuration


fullnameOverride: ""

nameOverride: ""

kubeVersionOverride: ""

revisionHistoryLimit: ~

# User and group of airflow user
uid: 50000
gid: 0

# Default security context for airflow
securityContext: {}
#  runAsUser: 50000
#  fsGroup: 0
#  runAsGroup: 0

# Airflow home directory
# Used for mount paths
airflowHome: /opt/airflow

# Default airflow repository -- overridden by all the specific images below
defaultAirflowRepository: airflow-custom

# Default airflow tag to deploy
defaultAirflowTag: "1.1.0"

# Airflow version (Used to make some decisions based on Airflow Version being deployed)
airflowVersion: "2.5.3"

# Images
images:
  airflow:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent

  useDefaultImageForMigration: false
  # timeout (in seconds) for airflow-migrations to complete
  migrationsWaitTimeout: 60
  pod_template:

    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  flower:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  statsd:
    repository: quay.io/prometheus/statsd-exporter
    tag: v0.22.8
    pullPolicy: IfNotPresent
  redis:
    repository: redis
    tag: 7-bullseye
    pullPolicy: IfNotPresent
  pgbouncer:
    repository: apache/airflow
    tag: airflow-pgbouncer-2021.04.28-1.14.0
    pullPolicy: IfNotPresent
  pgbouncerExporter:
    repository: apache/airflow
    tag: airflow-pgbouncer-exporter-2021.09.22-0.12.0
    pullPolicy: IfNotPresent
  gitSync:
    repository: k8s.gcr.io/git-sync/git-sync
    tag: v3.6.3
    pullPolicy: IfNotPresent

# Select certain nodes for airflow pods.
nodeSelector: {}
affinity: {}
tolerations: []
topologySpreadConstraints: []

# Add common labels to all objects and pods defined in this chart.
labels: {}

# Ingress configuration
ingress:
  # Enable all ingress resources (deprecated - use ingress.web.enabled and ingress.flower.enabled)
  enabled: ~

  # Configs for the Ingress of the web Service
  web:
    # Enable web ingress resource
    enabled: false

    # Annotations for the web Ingress
    annotations: {}

    # The path for the web Ingress
    path: "/"

    # The pathType for the above path (used only with Kubernetes v1.19 and above)
    pathType: "ImplementationSpecific"

    # The hostname for the web Ingress (Deprecated - renamed to `ingress.web.hosts`)
    host: ""

    # The hostnames or hosts configuration for the web Ingress
    hosts: []

    # The Ingress Class for the web Ingress (used only with Kubernetes v1.19 and above)
    ingressClassName: ""

    # configs for web Ingress TLS (Deprecated - renamed to `ingress.web.hosts[*].tls`)
    tls:
      # Enable TLS termination for the web Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""

    # HTTP paths to add to the web Ingress before the default path
    precedingPaths: []

    # Http paths to add to the web Ingress after the default path
    succeedingPaths: []

  # Configs for the Ingress of the flower Service
  flower:
    # Enable web ingress resource
    enabled: false

    # Annotations for the flower Ingress
    annotations: {}

    # The path for the flower Ingress
    path: "/"

    # The pathType for the above path (used only with Kubernetes v1.19 and above)
    pathType: "ImplementationSpecific"

    # The hostname for the flower Ingress (Deprecated - renamed to `ingress.flower.hosts`)
    host: ""

    # The hostnames or hosts configuration for the flower Ingress
    hosts: []

    # The Ingress Class for the flower Ingress (used only with Kubernetes v1.19 and above)
    ingressClassName: ""

    # configs for flower Ingress TLS (Deprecated - renamed to `ingress.flower.hosts[*].tls`)
    tls:
      # Enable TLS termination for the flower Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""

# Network policy configuration
networkPolicies:
  # Enabled network policies
  enabled: false

# Extra annotations to apply to all
# Airflow pods
airflowPodAnnotations: {}

# Extra annotations to apply to
# main Airflow configmap
airflowConfigAnnotations: {}

# `airflow_local_settings` file as a string (can be templated).
airflowLocalSettings: |-
  {{- if semverCompare ">=2.2.0" .Values.airflowVersion }}
  {{- if not (or .Values.webserverSecretKey .Values.webserverSecretKeySecretName) }}
  from airflow.www.utils import UIAlert

  DASHBOARD_UIALERTS = [
    UIAlert(
      'Usage of a dynamic webserver secret key detected. We recommend a static webserver secret key instead.'
      ' See the <a href='
      '"https://airflow.apache.org/docs/helm-chart/stable/production-guide.html#webserver-secret-key">'
      'Helm Chart Production Guide</a> for more details.',
      category="warning",
      roles=["Admin"],
      html=True,
    )
  ]
  {{- end }}
  {{- end }}

# Enable RBAC (default on most clusters these days)
rbac:
  # Specifies whether RBAC resources should be created
  create: true
  createSCCRoleBinding: false

# Airflow executor
# One of: LocalExecutor, LocalKubernetesExecutor, CeleryExecutor, KubernetesExecutor, CeleryKubernetesExecutor
executor: "KubernetesExecutor"

allowPodLaunching: true

# Environment variables for all airflow containers
env: []
# - name: ""
#   value: ""

# Volumes for all airflow containers
volumes: []

# VolumeMounts for all airflow containers
volumeMounts: []

# Secrets for all airflow containers
secret:
- envName: "AIRFLOW_CONN_SNOWFLAKE"
  secretName: "airflow-snowflake-connection"
  secretKey: "AIRFLOW_CONN_SNOWFLAKE"
- envName: "AIRFLOW_CONN_ADLS"
  secretName: "azure-blob-storage-secret"
  secretKey: "AIRFLOW_CONN_ADLS"

enableBuiltInSecretEnvVars:
  AIRFLOW__CORE__FERNET_KEY: true
  # For Airflow <2.3, backward compatibility; moved to [database] in 2.3
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: true
  AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: true
  AIRFLOW_CONN_AIRFLOW_DB: true
  AIRFLOW__WEBSERVER__SECRET_KEY: true
  AIRFLOW__CELERY__CELERY_RESULT_BACKEND: true
  AIRFLOW__CELERY__RESULT_BACKEND: true
  AIRFLOW__CELERY__BROKER_URL: true
  AIRFLOW__ELASTICSEARCH__HOST: true
  AIRFLOW__ELASTICSEARCH__ELASTICSEARCH_HOST: true

extraSecrets: {}

extraConfigMaps: {}

extraEnv: ~

# Airflow database & redis config
data:

  metadataSecretName: custom-airflow-metadata-secret

  resultBackendSecretName: ~
  brokerUrlSecretName: ~

  # Otherwise pass connection values in
  metadataConnection: ~

  resultBackendConnection: ~

  brokerUrl: ~

# Fernet key settings
# Note: fernetKey can only be set during install, not upgrade
fernetKey: ~
fernetKeySecretName: ~

# Flask secret key for Airflow Webserver: `[webserver] secret_key` in airflow.cfg
webserverSecretKey: ~
webserverSecretKeySecretName: ~

kerberos:
  enabled: false
  ccacheMountPath: /var/kerberos-ccache
  ccacheFileName: cache
  configPath: /etc/krb5.conf
  keytabBase64Content: ~
  keytabPath: /etc/airflow.keytab
  principal: airflow@FOO.COM
  reinitFrequency: 3600
  config: |

    [logging]
    default = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_libs.log"
    kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log"
    admin_server = "FILE:{{ template "airflow_logs_no_quote" . }}/kadmind.log"

    [libdefaults]
    default_realm = FOO.COM
    ticket_lifetime = 10h
    renew_lifetime = 7d
    forwardable = true

    [realms]
    FOO.COM = {
      kdc = kdc-server.foo.com
      admin_server = admin_server.foo.com
    }

# Airflow Worker Config
workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow workers (templated).
  command: ~
  # Args to use when running Airflow workers (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }}

  # If the worker stops responding for 5 minutes (5*60s) kill the
  # worker and let Kubernetes restart it
  livenessProbe:
    enabled: true
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~

  # Update Strategy when worker is deployed as a StatefulSet
  updateStrategy: ~
  # Update Strategy when worker is deployed as a Deployment
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  # Allow KEDA autoscaling.
  # Persistence.enabled must be set to false to use KEDA.
  keda:
    enabled: false
    namespaceLabels: {}

    # How often KEDA polls the airflow DB to report new scale requests to the HPA
    pollingInterval: 5

    # How many seconds KEDA will wait before scaling to zero.
    # Note that HPA has a separate cooldown period for scale-downs
    cooldownPeriod: 30

    # Minimum number of workers created by keda
    minReplicaCount: 0

    # Maximum number of workers created by keda
    maxReplicaCount: 10

    # Specify HPA related options
    advanced: {}
    # horizontalPodAutoscalerConfig:
    #   behavior:
    #     scaleDown:
    #       stabilizationWindowSeconds: 300
    #       policies:
    #         - type: Percent
    #           value: 100
    #           periodSeconds: 15

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 100Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Execute init container to chown log directory.
    # This is currently only needed in kind, due to usage
    # of local-path provisioner.
    fixPermissions: false
    # Annotations to add to worker volumes
    annotations: {}

  kerberosSidecar:
    # Enable kerberos sidecar
    enabled: false
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Grace period for tasks to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 600

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into worker.
  # Note: If used with KubernetesExecutor, you are responsible for signaling sidecars to exit when the main
  # container finishes so Airflow can continue the worker shutdown process!
  extraContainers: []
  # Add additional init containers into workers.
  extraInitContainers: []

  # Mount additional volumes into worker.
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow worker pods.
  nodeSelector: {}
  priorityClassName: ~
  affinity: {}

  tolerations: []
  topologySpreadConstraints: []
  # hostAliases to use in worker pods.
  # See:
  # https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/
  hostAliases: []
  # - ip: "127.0.0.2"
  #   hostnames:
  #   - "test.hostname.one"
  # - ip: "127.0.0.3"
  #   hostnames:
  #   - "test.hostname.two"

  # annotations for the worker resource
  annotations: {}

  podAnnotations: {}

  # Labels specific to workers objects and pods
  labels: {}

  logGroomerSidecar:
    # Whether to deploy the Airflow worker log groomer sidecar.
    enabled: true
    # Command to use when running the Airflow worker log groomer sidecar (templated).
    command: ~
    # Args to use when running the Airflow worker log groomer sidecar (templated).
    args: ["bash", "/clean-logs"]
    # Number of days to retain logs
    retentionDays: 15
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi

  waitForMigrations:
    env: []

  env: []

# Airflow scheduler settings
scheduler:
  # If the scheduler stops heartbeating for 5 minutes (5*60s) kill the
  # scheduler and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~
  # Airflow 2.0 allows users to run multiple schedulers,
  # However this feature is only recommended for MySQL 8+ and Postgres
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running the Airflow scheduler (templated).
  command: ~
  # Args to use when running the Airflow scheduler (templated).
  args: ["bash", "-c", "exec airflow scheduler"]

  # Update Strategy when scheduler is deployed as a StatefulSet
  # (when using LocalExecutor and workers.persistence)
  updateStrategy: ~
  # Update Strategy when scheduler is deployed as a Deployment
  # (when not using LocalExecutor and workers.persistence)
  strategy: ~

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to scheduler kubernetes service account.
    annotations: {}

  # Scheduler pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      maxUnavailable: 1

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into scheduler.
  extraContainers: []
  # Add additional init containers into scheduler.
  extraInitContainers: []

  # Mount additional volumes into scheduler.
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow scheduler pods.
  nodeSelector: {}
  affinity: {}

  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # annotations for scheduler deployment
  annotations: {}

  podAnnotations: {}

  # Labels specific to scheduler objects and pods
  labels: {}

  logGroomerSidecar:
    # Whether to deploy the Airflow scheduler log groomer sidecar.
    enabled: true
    # Command to use when running the Airflow scheduler log groomer sidecar (templated).
    command: ~
    # Args to use when running the Airflow scheduler log groomer sidecar (templated).
    args: ["bash", "/clean-logs"]
    # Number of days to retain logs
    retentionDays: 15
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []

  env: []

# Airflow create user job settings
createUserJob:
  # Command to use when running the create user job (templated).
  command: ~
  # Args to use when running the create user job (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "users create" "create_user" }} "$@"
    - --
    - "-r"
    - "{{ .Values.webserver.defaultUser.role }}"
    - "-u"
    - "{{ .Values.webserver.defaultUser.username }}"
    - "-e"
    - "{{ .Values.webserver.defaultUser.email }}"
    - "-f"
    - "{{ .Values.webserver.defaultUser.firstName }}"
    - "-l"
    - "{{ .Values.webserver.defaultUser.lastName }}"
    - "-p"
    - "{{ .Values.webserver.defaultUser.password }}"

  # Annotations on the create user job pod
  annotations: {}
  # jobAnnotations are annotations on the create user job
  jobAnnotations: {}

  # Labels specific to createUserJob objects and pods
  labels: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to create user kubernetes service account.
    annotations: {}

  # Launch additional containers into user creation job
  extraContainers: []

  # Mount additional volumes into user creation job
  extraVolumes: []
  extraVolumeMounts: []

  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []
  # In case you need to disable the helm hooks that create the jobs after install.
  # Disable this if you are using ArgoCD for example
  useHelmHooks: true
  applyCustomEnv: true

  env: []

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

# Airflow database migration job settings
migrateDatabaseJob:
  enabled: true
  # Command to use when running the migrate database job (templated).
  command: ~
  # Args to use when running the migrate database job (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "db upgrade" "upgradedb" }}

  # Annotations on the database migration pod
  annotations: {}
  # jobAnnotations are annotations on the database migration job
  jobAnnotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to migrate database job kubernetes service account.
    annotations: {}

  resources: {}

  # Launch additional containers into database migration job
  extraContainers: []

  # Mount additional volumes into database migration job
  extraVolumes: []
  extraVolumeMounts: []

  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []
  # In case you need to disable the helm hooks that create the jobs after install.
  # Disable this if you are using ArgoCD for example
  useHelmHooks: true
  applyCustomEnv: true

# Airflow webserver settings
webserver:
  allowPodLogReading: true
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5
    scheme: HTTP

  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5
    scheme: HTTP

  # Number of webservers
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running the Airflow webserver (templated).
  command: ~
  # Args to use when running the Airflow webserver (templated).
  args: ["bash", "-c", "exec airflow webserver"]

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to webserver kubernetes service account.
    annotations: {}

  # Webserver pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      maxUnavailable: 1

  # Allow overriding Update Strategy for Webserver
  strategy: ~

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Additional network policies as needed (Deprecated - renamed to `webserver.networkPolicy.ingress.from`)
  extraNetworkPolicies: []
  networkPolicy:
    ingress:
      # Peers for webserver NetworkPolicy ingress
      from: []
      # Ports for webserver NetworkPolicy ingress (if `from` is set)
      ports:
        - port: "{{ .Values.ports.airflowUI }}"

  resources: {}

  # Create initial user.
  defaultUser:
    enabled: true
    role: Admin
    username: admin
    email: admin@example.com
    firstName: admin
    lastName: user
    password: admin

  # Launch additional containers into webserver.
  extraContainers: []
  # Add additional init containers into webserver.
  extraInitContainers: []

  # Mount additional volumes into webserver.
  extraVolumes: []
  extraVolumeMounts: []

  # This string (can be templated) will be mounted into the Airflow Webserver
  # as a custom webserver_config.py. You can bake a webserver_config.py in to
  # your image instead or specify a configmap containing the
  # webserver_config.py.
  webserverConfig: ~
  # webserverConfig: |
  #   from airflow import configuration as conf

  #   # The SQLAlchemy connection string.
  #   SQLALCHEMY_DATABASE_URI = conf.get('database', 'SQL_ALCHEMY_CONN')

  #   # Flask-WTF flag for CSRF
  #   CSRF_ENABLED = True
  webserverConfigConfigMapName: ~

  service:
    type: ClusterIP
    ## service annotations
    annotations: {}
    ports:
      - name: airflow-ui
        port: "{{ .Values.ports.airflowUI }}"

    loadBalancerIP: ~
    ## Limit load balancer source ips to list of CIDRs
    # loadBalancerSourceRanges:
    #   - "10.123.0.0/16"
    loadBalancerSourceRanges: []

  # Select certain nodes for airflow webserver pods.
  nodeSelector: {}
  priorityClassName: ~
  affinity: {}

  tolerations: []
  topologySpreadConstraints: []

  # annotations for webserver deployment
  annotations: {}

  podAnnotations: {}

  # Labels specific webserver app
  labels: {}

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []

  env: []

# Airflow Triggerer Config
triggerer:
  enabled: true
  # Number of airflow triggerers in the deployment
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow triggerers (templated).
  command: ~
  # Args to use when running Airflow triggerer (templated).
  args: ["bash", "-c", "exec airflow triggerer"]

  # Update Strategy for triggerers
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"

  # If the triggerer stops heartbeating for 5 minutes (5*60s) kill the
  # triggerer and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to triggerer kubernetes service account.
    annotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  resources: {}

  # Grace period for triggerer to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 60

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into triggerer.
  extraContainers: []
  # Add additional init containers into triggerers.
  extraInitContainers: []

  # Mount additional volumes into triggerer.
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow triggerer pods.
  nodeSelector: {}
  affinity: {}

  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # annotations for the triggerer deployment
  annotations: {}

  podAnnotations: {}

  # Labels specific to triggerer objects and pods
  labels: {}

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []

  env: []

# Airflow Dag Processor Config
dagProcessor:
  enabled: false
  # Number of airflow dag processors in the deployment
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow dag processors (templated).
  command: ~
  # Args to use when running Airflow dag processor (templated).
  args: ["bash", "-c", "exec airflow dag-processor"]

  # Update Strategy for dag processors
  strategy:
    rollingUpdate:
      maxSurge: "100%"
      maxUnavailable: "50%"

  # If the dag processor stops heartbeating for 5 minutes (5*60s) kill the
  # dag processor and let Kubernetes restart it
  livenessProbe:
    initialDelaySeconds: 10
    timeoutSeconds: 20
    failureThreshold: 5
    periodSeconds: 60
    command: ~

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to dag processor kubernetes service account.
    annotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Grace period for dag processor to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 60

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Launch additional containers into dag processor.
  extraContainers: []
  # Add additional init containers into dag processors.
  extraInitContainers: []

  # Mount additional volumes into dag processor.
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow dag processor pods.
  nodeSelector: {}
  affinity: {}

  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # annotations for the dag processor deployment
  annotations: {}

  podAnnotations: {}

  waitForMigrations:
    # Whether to create init container to wait for db migrations
    enabled: true
    env: []

  env: []

# Flower settings
flower:
  # Enable flower.
  # If True, and using CeleryExecutor/CeleryKubernetesExecutor, will deploy flower app.
  enabled: false
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running flower (templated).
  command: ~
  # Args to use when running flower (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery flower" "flower" }}

  # Additional network policies as needed (Deprecated - renamed to `flower.networkPolicy.ingress.from`)
  extraNetworkPolicies: []
  networkPolicy:
    ingress:
      # Peers for flower NetworkPolicy ingress
      from: []
      # Ports for flower NetworkPolicy ingress (if ingressPeers is set)
      ports:
        - port: "{{ .Values.ports.flowerUI }}"

  resources: {}
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  fsGroup: 0
  #  runAsGroup: 0

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  # A secret containing the connection
  secretName: ~

  # Else, if username and password are set, create secret from username and password
  username: ~
  password: ~

  service:
    type: ClusterIP
    ## service annotations
    annotations: {}
    ports:
      - name: flower-ui
        port: "{{ .Values.ports.flowerUI }}"
    # To change the port used to access flower:
    # ports:
    #   - name: flower-ui
    #     port: 8080
    #     targetPort: flower-ui
    loadBalancerIP: ~
    ## Limit load balancer source ips to list of CIDRs
    # loadBalancerSourceRanges:
    #   - "10.123.0.0/16"
    loadBalancerSourceRanges: []

  # Launch additional containers into the flower pods.
  extraContainers: []
  # Mount additional volumes into the flower pods.
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for airflow flower pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # annotations for the flower deployment
  annotations: {}

  podAnnotations: {}

  # Labels specific to flower objects and pods
  labels: {}
  env: []

# StatsD settings
statsd:
  enabled: true
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Arguments for StatsD exporter command.
  args: ["--statsd.mapping-config=/etc/statsd-exporter/mappings.yml"]

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  uid: 65534
  # When not set, `statsd.uid` will be used
  securityContext: {}
  #  runAsUser: 65534
  #  fsGroup: 0
  #  runAsGroup: 0

  # Additional network policies as needed
  extraNetworkPolicies: []
  resources: {}
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  service:
    extraAnnotations: {}

  # Select certain nodes for StatsD pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  # Additional mappings for StatsD exporter.
  # If set, will merge default mapping and extra mappings, default mapping has higher priority.
  # So, if you want to change some default mapping, please use `overrideMappings`
  extraMappings: []

  # Override mappings for StatsD exporter.
  # If set, will ignore setting item in default and `extraMappings`.
  # So, If you use it, ensure all mapping item contains in it.
  overrideMappings: []

  podAnnotations: {}

# PgBouncer settings
pgbouncer:
  # Enable PgBouncer
  enabled: false
  # Number of PgBouncer replicas to run in Deployment
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~
  # Command to use for PgBouncer(templated).
  command: ["pgbouncer", "-u", "nobody", "/etc/pgbouncer/pgbouncer.ini"]
  # Args to use for PgBouncer(templated).
  args: ~
  auth_type: md5
  auth_file: /etc/pgbouncer/users.txt

  # annotations to be added to the PgBouncer deployment
  annotations: {}

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  # Additional network policies as needed
  extraNetworkPolicies: []

  # Pool sizes
  metadataPoolSize: 10
  resultBackendPoolSize: 5

  # Maximum clients that can connect to PgBouncer (higher = more file descriptors)
  maxClientConn: 100

  configSecretName: ~

  # PgBouncer pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      maxUnavailable: 1

  resources: {}

  service:
    extraAnnotations: {}

  # https://www.pgbouncer.org/config.html
  verbose: 0
  logDisconnections: 0
  logConnections: 0

  sslmode: "prefer"
  ciphers: "normal"

  ssl:
    ca: ~
    cert: ~
    key: ~

  # Add extra PgBouncer ini configuration in the databases section:
  # https://www.pgbouncer.org/config.html#section-databases
  extraIniMetadata: ~
  extraIniResultBackend: ~
  # Add extra general PgBouncer ini configuration: https://www.pgbouncer.org/config.html
  extraIni: ~

  # Mount additional volumes into pgbouncer.
  extraVolumes: []
  extraVolumeMounts: []

  # Select certain nodes for PgBouncer pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  priorityClassName: ~

  uid: 65534

  metricsExporterSidecar:
    resources: {}
    #  limits:
    #   cpu: 100m
    #   memory: 128Mi
    #  requests:
    #   cpu: 100m
    #   memory: 128Mi
    sslmode: "disable"

# Configuration for the redis provisioned by the chart
redis:
  enabled: true
  terminationGracePeriodSeconds: 600

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to worker kubernetes service account.
    annotations: {}

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 1Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Annotations to add to redis volumes
    annotations: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # If set use as redis secret. Make sure to also set data.brokerUrlSecretName value.
  passwordSecretName: ~

  # Else, if password is set, create secret with it,
  # Otherwise a new password will be generated on install
  # Note: password can only be set during install, not upgrade.
  password: ~

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Select certain nodes for redis pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  # Set to 0 for backwards-compatiblity
  uid: 0
  # If not set, `redis.uid` will be used
  securityContext: {}
  #  runAsUser: 999
  #  runAsGroup: 0

  podAnnotations: {}
# Auth secret for a private registry
# This is used if pulling airflow images from a private registry
registry:
  secretName: ~

  # Example:
  # connection:
  #   user: ~
  #   pass: ~
  #   host: ~
  #   email: ~
  connection: {}

# Elasticsearch logging configuration
elasticsearch:
  # Enable elasticsearch task logging
  enabled: false
  # A secret containing the connection
  secretName: ~
  # Or an object representing the connection
  # Example:
  # connection:
  #   user: ~
  #   pass: ~
  #   host: ~
  #   port: ~
  connection: {}

# All ports used by chart
ports:
  flowerUI: 5555
  airflowUI: 8080
  workerLogs: 8793
  redisDB: 6379
  statsdIngest: 9125
  statsdScrape: 9102
  pgbouncer: 6543
  pgbouncerScrape: 9127

# Define any ResourceQuotas for namespace
quotas: {}

# Define default/max/min values for pods and containers in namespace
limits: []

# This runs as a CronJob to cleanup old pods.
cleanup:
  enabled: false
  # Run every 15 minutes
  schedule: "*/15 * * * *"
  # Command to use when running the cleanup cronjob (templated).
  command: ~
  # Args to use when running the cleanup cronjob (templated).
  args: ["bash", "-c", "exec airflow kubernetes cleanup-pods --namespace={{ .Release.Namespace }}"]

  # Select certain nodes for airflow cleanup pods.
  nodeSelector: {}
  affinity: {}
  tolerations: []
  topologySpreadConstraints: []

  podAnnotations: {}

  # Labels specific to cleanup objects and pods
  labels: {}

  resources: {}
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Create ServiceAccount
  serviceAccount:
    # Specifies whether a ServiceAccount should be created
    create: true
    # The name of the ServiceAccount to use.
    # If not set and create is true, a name is generated using the release name
    name: ~

    # Annotations to add to cleanup cronjob kubernetes service account.
    annotations: {}

  # When not set, the values defined in the global securityContext will be used
  securityContext: {}
  #  runAsUser: 50000
  #  runAsGroup: 0
  env: []

  # Specify history limit
  # When set, overwrite the default k8s number of successful and failed CronJob executions that are saved.
  failedJobsHistoryLimit: ~
  successfulJobsHistoryLimit: ~

# Configuration for postgresql subchart
# Not recommended for production
postgresql:
  enabled: false

config:
  core:
    dags_folder: '{{ include "airflow_dags" . }}'
    # This is ignored when used with the official Docker image
    load_examples: 'False'
    executor: '{{ .Values.executor }}'
    # For Airflow 1.10, backward compatibility; moved to [logging] in 2.0
    colored_console_log: 'False'
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
  logging:
    remote_logging: 'True'
    # colored_console_log: 'False'
    logging_config_class: log_config.LOGGING_CONFIG
    remote_log_conn_id: AIRFLOW_CONN_ADLS
  metrics:
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
  webserver:
    enable_proxy_fix: 'True'
    # For Airflow 1.10
    rbac: 'True'
  celery:
    flower_url_prefix: '{{ .Values.ingress.flower.path }}'
    worker_concurrency: 16
  scheduler:
    standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}'
    # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
    # `run_duration` included for Airflow 1.10 backward compatibility; removed in 2.0.
    run_duration: 41460
  elasticsearch:
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
  elasticsearch_configs:
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'
  kerberos:
    keytab: '{{ .Values.kerberos.keytabPath }}'
    reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
    principal: '{{ .Values.kerberos.principal }}'
    ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
  celery_kubernetes_executor:
    kubernetes_queue: 'kubernetes'
  kubernetes:
    namespace: '{{ .Release.Namespace }}'
    airflow_configmap: '{{ include "airflow_config" . }}'
    airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
    pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml'
    worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
    worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
    multi_namespace_mode: '{{ ternary "True" "False" .Values.multiNamespaceMode }}'

# Whether Airflow can launch workers and/or pods in multiple namespaces
# If true, it creates ClusterRole/ClusterRolebinding (with access to entire cluster)
multiNamespaceMode: false

podTemplate: ~

# Git sync
dags:
  persistence:
    # Enable persistent volume for storing dags
    enabled: false
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName: azureblob-fuse-premium
    # access mode of the persistent volume
    accessMode: ReadWriteOnce
    ## the name of an existing PVC to use
    existingClaim: airflow-dags
    ## optional subpath for dag volume mount
    subPath: ~
  gitSync:
    enabled: True

    # git repo clone url
    # ssh example: git@github.com:apache/airflow.git
    # https example: https://github.com/apache/airflow.git
    repo: https://github.com/ahmadfarhan97/example.git
    branch: main
    rev: HEAD
    depth: 1
    # the number of consecutive failures allowed before aborting
    maxFailures: 0
    # subpath within the repo where dags are located
    # should be "" if dags are at repo root
    subPath: "dags"

    credentialsSecret: git-credentials

    wait: 5
    containerName: git-sync
    uid: 65533

    # When not set, the values defined in the global securityContext will be used
    securityContext: {}
    #  runAsUser: 65533
    #  runAsGroup: 0

    extraVolumeMounts: []
    env: []
    resources: {}

logs:
  persistence:
    # Enable persistent volume for storing logs
    enabled: false
    # Volume size for logs
    size: 100Gi
    # If using a custom storageClass, pass name here
    storageClassName:
    ## the name of an existing PVC to use
    existingClaim: airflow-logs

Docker Image customizations

FROM apache/airflow:2.5.3-python3.8

ENV PYTHONPATH "${PYTHONPATH}:${AIRFLOW_HOME}"

COPY requirements.txt .

COPY __init__.py ${AIRFLOW_HOME}/config/__init__.py
COPY log_config.py ${AIRFLOW_HOME}/config/log_config.py

RUN pip install -r requirements.txt

What happened

I was trying to connect to Azure blob Storage for logs from local machine. I used the Airflow documentation to setup the configuration in values.yaml as well as creating and copying a log_config.py file into the docker image (as shown in the docker image customizations section). I keep getting the following error in the airflow-run-airflow-migrations-xxxxx pod:

....................
ERROR! Maximum number of retries (20) reached.

Last check result:
$ airflow db check
Unable to load the config, contains a configuration error.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/logging/config.py", line 563, in configure
    handler = self.configure_handler(handlers[name])
  File "/usr/local/lib/python3.8/logging/config.py", line 744, in configure_handler
    result = factory(**kwargs)
TypeError: __init__() missing 1 required positional argument: 'delete_local_copy'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__init__.py", line 64, in <module>
    settings.initialize()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/settings.py", line 570, in initialize
    LOGGING_CLASS_PATH = configure_logging()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/logging_config.py", line 74, in configure_logging
    raise e
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/logging_config.py", line 69, in configure_logging
    dictConfig(logging_config)
  File "/usr/local/lib/python3.8/logging/config.py", line 808, in dictConfig
    dictConfigClass(config).configure()
  File "/usr/local/lib/python3.8/logging/config.py", line 570, in configure
    raise ValueError('Unable to configure handler '
ValueError: Unable to configure handler 'task'

What you think should happen instead

The pods keep getting error

How to reproduce

The following script is the log_config.py file which is copied using into `` the dockerfile

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Airflow logging settings."""
from __future__ import annotations

import os
from pathlib import Path
from typing import Any
from urllib.parse import urlsplit

from airflow.configuration import conf
from airflow.exceptions import AirflowException

LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()

# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()

LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")

LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
    "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
)

COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")

COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")

COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")

DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")

BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")

PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")

DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
    "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
)

# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
# All of these handlers inherited from FileTaskHandler and providing any value rather than None
# would raise deprecation warning.
FILENAME_TEMPLATE: str | None = None

PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")

LOGGING_CONFIG: dict[str, Any] = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "airflow": {
            "format": LOG_FORMAT,
            "class": LOG_FORMATTER_CLASS,
        },
        "airflow_coloured": {
            "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
            "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
        },
        "source_processor": {
            "format": DAG_PROCESSOR_LOG_FORMAT,
            "class": LOG_FORMATTER_CLASS,
        },
    },
    "filters": {
        "mask_secrets": {
            "()": "airflow.utils.log.secrets_masker.SecretsMasker",
        },
    },
    "handlers": {
        "console": {
            "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
            "formatter": "airflow_coloured",
            "stream": "sys.stdout",
            "filters": ["mask_secrets"],
        },
        "task": {
            "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
            "formatter": "airflow",
            "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
            "filters": ["mask_secrets"],
        },
        "processor": {
            "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
            "formatter": "airflow",
            "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
            "filename_template": PROCESSOR_FILENAME_TEMPLATE,
            "filters": ["mask_secrets"],
        },
        "processor_to_stdout": {
            "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
            "formatter": "source_processor",
            "stream": "sys.stdout",
            "filters": ["mask_secrets"],
        },
    },
    "loggers": {
        "airflow.processor": {
            "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
            "level": LOG_LEVEL,
            # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
            "propagate": True,
        },
        "airflow.task": {
            "handlers": ["task"],
            "level": LOG_LEVEL,
            # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
            "propagate": True,
            "filters": ["mask_secrets"],
        },
        "flask_appbuilder": {
            "handlers": ["console"],
            "level": FAB_LOG_LEVEL,
            "propagate": True,
        },
    },
    "root": {
        "handlers": ["console"],
        "level": LOG_LEVEL,
        "filters": ["mask_secrets"],
    },
}

EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
if EXTRA_LOGGER_NAMES:
    new_loggers = {
        logger_name.strip(): {
            "handlers": ["console"],
            "level": LOG_LEVEL,
            "propagate": True,
        }
        for logger_name in EXTRA_LOGGER_NAMES.split(",")
    }
    LOGGING_CONFIG["loggers"].update(new_loggers)

DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
    "handlers": {
        "processor_manager": {
            "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
            "formatter": "airflow",
            "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
            "mode": "a",
            "maxBytes": 104857600,  # 100MB
            "backupCount": 5,
        }
    },
    "loggers": {
        "airflow.processor_manager": {
            "handlers": ["processor_manager"],
            "level": LOG_LEVEL,
            "propagate": False,
        }
    },
}

# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
    LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
    LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])

    # Manually create log directory for processor_manager handler as RotatingFileHandler
    # will only create file but not the directory.
    processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
        "processor_manager"
    ]
    directory: str = os.path.dirname(processor_manager_handler_config["filename"])
    Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)

##################
# Remote logging #
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")

if REMOTE_LOGGING:

    ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")

    # Storage bucket URL for remote logging
    # S3 buckets should start with "s3://"
    # Cloudwatch log groups should start with "cloudwatch://"
    # GCS buckets should start with "gs://"
    # WASB buckets should start with "wasb"
    # just to help Airflow select correct handler
    REMOTE_BASE_LOG_FOLDER: str = 'wasb://testcontainerw@testblobstorage.blob.core.windows.net'
    REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})

    if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
        S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
            "task": {
                "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
                "formatter": "airflow",
                "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                "s3_log_folder": REMOTE_BASE_LOG_FOLDER,
                "filename_template": FILENAME_TEMPLATE,
            },
        }

        LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
    elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
        url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
        CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
            "task": {
                "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
                "formatter": "airflow",
                "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                "log_group_arn": url_parts.netloc + url_parts.path,
                "filename_template": FILENAME_TEMPLATE,
            },
        }

        LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
    elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
        key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
        GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
            "task": {
                "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
                "formatter": "airflow",
                "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                "gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
                "filename_template": FILENAME_TEMPLATE,
                "gcp_key_path": key_path,
            },
        }

        LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
    elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
        WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
            "task": {
                "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
                "formatter": "airflow",
                "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
                "wasb_container": "airflow-logs",
                "filename_template": FILENAME_TEMPLATE,
            },
        }

        LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
    elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
        key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
        # stackdriver:///airflow-tasks => airflow-tasks
        log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
        STACKDRIVER_REMOTE_HANDLERS = {
            "task": {
                "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
                "formatter": "airflow",
                "name": log_name,
                "gcp_key_path": key_path,
            }
        }

        LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
    elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
        OSS_REMOTE_HANDLERS = {
            "task": {
                "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
                "formatter": "airflow",
                "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
                "oss_log_folder": REMOTE_BASE_LOG_FOLDER,
                "filename_template": FILENAME_TEMPLATE,
            },
        }
        LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
    elif ELASTICSEARCH_HOST:
        ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
        ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
        ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
        ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
        ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
        ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
        ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")

        ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
            "task": {
                "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
                "formatter": "airflow",
                "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                "filename_template": FILENAME_TEMPLATE,
                "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
                "host": ELASTICSEARCH_HOST,
                "frontend": ELASTICSEARCH_FRONTEND,
                "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
                "json_format": ELASTICSEARCH_JSON_FORMAT,
                "json_fields": ELASTICSEARCH_JSON_FIELDS,
                "host_field": ELASTICSEARCH_HOST_FIELD,
                "offset_field": ELASTICSEARCH_OFFSET_FIELD,
            },
        }

        LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
    else:
        raise AirflowException(
            "Incorrect remote log configuration. Please check the configuration of option 'host' in "
            "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
            "'remote_base_log_folder' option in the 'logging' section."
        )
    LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)

and the init.py empty file is copied to the image in the same directory

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

tirkarthi commented 1 year ago

You are using WasbTaskHandler that needs delete_local_copy to be passed. You need to either update WASB_REMOTE_HANDLERS with this or pass delete_local_copy through REMOTE_TASK_HANDLER_KWARGS

https://github.com/apache/airflow/blob/cb842dd8aa8749b4d0a474c6e2fda11505d9ba99/airflow/providers/microsoft/azure/log/wasb_task_handler.py#L30-L44

This was changed in below commit to retrieve value from configuration

commit b6392ae5fd466fa06ca92c061a0f93272e27a26b Author: Hussein Awala houssein.awala.96@gmail.com Date: Tue Mar 7 17:30:56 2023 +0100

hussein-awala commented 1 year ago

you can fix the issue by updating the provided config file:

elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
        WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
            "task": {
                "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
                "formatter": "airflow",
                "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
                "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
                "wasb_container": "airflow-logs",
                "filename_template": FILENAME_TEMPLATE,
+               "delete_local_copy": False,
            },
        }

or upgrading the provider to the latest version which use False as default value, and in Airflow 2.6.0 you will be able to configure delete_local_copy through Airflow config.

I'll check why do you have this problem, but can you test if this change can fix it?

ahmadfarhan97 commented 1 year ago

@hussein-awala I added the line you suggested but now I'm getting a different error and the following is what I get for the server pod?

{wasb_task_handler.py:133} ERROR - can't list blobs
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/microsoft/azure/log/wasb_task_handler.py", line 129, in _read_remote_logs
    blob_names = self.hook.get_blobs_list(container_name=self.wasb_container, prefix=prefix)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/microsoft/azure/hooks/wasb.py", line 276, in get_blobs_list
    for blob in blobs:
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/paging.py", line 132, in __next__
    return next(self._page_iterator)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/paging.py", line 76, in __next__
    self._response = self._get_next(self.continuation_token)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/storage/blob/_list_blobs_helper.py", line 100, in _get_next_cb
    process_storage_error(error)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py", line 97, in process_storage_error
    raise storage_error
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/storage/blob/_list_blobs_helper.py", line 93, in _get_next_cb
    return self._command(
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/storage/blob/_generated/operations/_container_operations.py", line 2605, in list_blob_hierarchy_segment
    pipeline_response = self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 205, in run
    return first_node.send(pipeline_request)  # type: ignore
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 69, in send
    response = self.next.send(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 69, in send
    response = self.next.send(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 69, in send
    response = self.next.send(request)
  [Previous line repeated 2 more times]
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/policies/_redirect.py", line 160, in send
    response = self.next.send(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 69, in send
    response = self.next.send(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/storage/blob/_shared/policies.py", line 546, in send
    raise err
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/storage/blob/_shared/policies.py", line 520, in send
    response = self.next.send(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 69, in send
    response = self.next.send(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 69, in send
    response = self.next.send(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 115, in send
    self.on_request(request)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication.py", line 92, in on_request
    self._token = self._credential.get_token(*self._scopes)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/identity/_credentials/default.py", line 168, in get_token
    return super(DefaultAzureCredential, self).get_token(*scopes, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/azure/identity/_credentials/chained.py", line 101, in get_token
    raise ClientAuthenticationError(message=message)
azure.core.exceptions.ClientAuthenticationError: DefaultAzureCredential failed to retrieve a token from the included credentials.
Attempted credentials:
        EnvironmentCredential: EnvironmentCredential authentication unavailable. Environment variables are not fully configured.
Visit https://aka.ms/azsdk/python/identity/environmentcredential/troubleshoot to troubleshoot.this issue.
        ManagedIdentityCredential: ManagedIdentityCredential authentication unavailable, no response from the IMDS endpoint.
        SharedTokenCacheCredential: SharedTokenCacheCredential authentication unavailable. No accounts were found in the cache.
        AzureCliCredential: Azure CLI not found on path
        AzurePowerShellCredential: PowerShell is not installed
To mitigate this issue, please refer to the troubleshooting guidelines here at https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot.

I passed the blob storage secret as a connection string.

And this is how the values.yaml is referring to the secret:

secret:
- envName: "AIRFLOW_CONN_ADLS"
  secretName: "azure-blob-storage-secret"
  secretKey: "AIRFLOW_CONN_ADLS"

config:
  logging:
    remote_logging: 'True'
    logging_config_class: log_config.LOGGING_CONFIG
    remote_log_conn_id: ADLS