airflow-helm / charts

The User-Community Airflow Helm Chart is the standard way to deploy Apache Airflow on Kubernetes with Helm. Originally created in 2017, it has since helped thousands of companies create production-ready deployments of Airflow on Kubernetes.
https://github.com/airflow-helm/charts/tree/main/charts/airflow
Apache License 2.0
631 stars 473 forks source link

Worker nodes can't reach embedded Redis but other services can. #654

Closed agconti closed 1 year ago

agconti commented 1 year ago

Checks

Chart Version

8.6.1

Kubernetes Version

Client Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.0", GitCommit:"a866cbe2e5bbaa01cfd5e969aa3e033f3282a8a2", GitTreeState:"clean", BuildDate:"2022-08-23T17:44:59Z", GoVersion:"go1.19", Compiler:"gc", Platform:"darwin/amd64"}
Kustomize Version: v4.5.7
Server Version: version.Info{Major:"1", Minor:"21+", GitVersion:"v1.21.14-eks-18ef993", GitCommit:"ac73613dfd25370c18cbbbc6bfc65449397b35c7", GitTreeState:"clean", BuildDate:"2022-07-06T18:06:50Z", GoVersion:"go1.16.15", Compiler:"gc", Platform:"linux/amd64"}

Helm Version

version.BuildInfo{Version:"v3.10.0", GitCommit:"ce66412a723e4d89555dc67217607c6579ffcb21", GitTreeState:"clean", GoVersion:"go1.19.1"}

Description

I've deployed the helm chart according to this project's guides and recommendations. While each pod comes up fine, tasks cannot run. After inspecting the worker pods, the logs reveal they are not able to connect to the embedded Redis instance. After exec'ing into the redis-master pod and running PING, its clear that Redis is readable by the connection string that workers are using. However, the workers cannot connect.

Is there a configuration step I'm missing or an incorrect configuration that would cause this?

Relevant Logs

Logs for Redis pod. Redis is working and ready:

12:28:31.99 INFO  ==> ** Starting Redis **
1:C 05 Oct 2022 12:28:31.999 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
1:C 05 Oct 2022 12:28:31.999 # Redis version=5.0.7, bits=64, commit=00000000, modified=0, pid=1, just started
1:C 05 Oct 2022 12:28:31.999 # Configuration loaded
1:M 05 Oct 2022 12:28:32.001 * Running mode=standalone, port=6379.
1:M 05 Oct 2022 12:28:32.001 # Server initialized
1:M 05 Oct 2022 12:28:32.001 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
1:M 05 Oct 2022 12:28:32.001 * Ready to accept connections

Logs from worker. It's not able to connect:

[2022-10-05 14:56:40 +0000] [65] [INFO] Starting gunicorn 20.1.0
[2022-10-05 14:56:40 +0000] [65] [INFO] Listening at: http://[::]:8793 (65)
[2022-10-05 14:56:40 +0000] [65] [INFO] Using worker: sync
[2022-10-05 14:56:40 +0000] [66] [INFO] Booting worker with pid: 66
[2022-10-05 14:56:40 +0000] [67] [INFO] Booting worker with pid: 67

 -------------- celery@airflow-worker-0 v5.2.3 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.4.209-116.367.amzn2.x86_64-x86_64-with-glibc2.2.5 2022-10-05 14:56:41
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7f01972f9ac0
- ** ---------- .> transport:   redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1
- ** ---------- .> results:     postgresql://airflow:**@pmy-psql-db/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=default(direct) key=default

[tasks]
  . airflow.executors.celery_executor.execute_command

[2022-10-05 14:58:54,704: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 2.00 seconds... (1/100)

[2022-10-05 15:01:07,824: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 4.00 seconds... (2/100)

[2022-10-05 15:03:22,992: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 6.00 seconds... (3/100)

[2022-10-05 15:05:40,208: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 8.00 seconds... (4/100)

[2022-10-05 15:07:57,424: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 10.00 seconds... (5/100)

[2022-10-05 15:10:16,688: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 12.00 seconds... (6/100)

[2022-10-05 15:12:38,000: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 14.00 seconds... (7/100)

[2022-10-05 15:15:01,360: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 16.00 seconds... (8/100)

[2022-10-05 15:17:26,768: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 18.00 seconds... (9/100)

[2022-10-05 15:19:54,224: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 20.00 seconds... (10/100)

[2022-10-05 15:22:23,728: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 22.00 seconds... (11/100)

[2022-10-05 15:24:55,280: ERROR/MainProcess] consumer: Cannot connect to redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1: Error 110 connecting to airflow-redis-master.default.svc.cluster.local:6379. Connection timed out..
Trying again in 24.00 seconds... (12/100)

Logs from connecting to the Redis master pod, and connecting with redis-cli directly. It works:

$ kubectl exec -t -i airflow-redis-master-0 sh                                                  
$ redis-cli -u redis://:airflow@airflow-redis-master.default.svc.cluster.local:6379/1 PING
PONG

Logs from flower. It appears to have connected to Redis fine:

Defaulted container "airflow-flower" out of: airflow-flower, check-db (init), wait-for-db-migrations (init)
[2022-10-05 16:14:49,190] {command.py:152} INFO - Visit me at http://0.0.0.0:5555
[2022-10-05 16:14:49,213] {command.py:159} INFO - Broker: redis://:**@airflow-redis-master.default.svc.cluster.local:6379/1
[2022-10-05 16:14:49,215] {command.py:160} INFO - Registered tasks: 
['airflow.executors.celery_executor.execute_command',
 'celery.accumulate',
 'celery.backend_cleanup',
 'celery.chain',
 'celery.chord',
 'celery.chord_unlock',
 'celery.chunks',
 'celery.group',
 'celery.map',
 'celery.starmap']

Logs from scheduler, the celery task is timing out:

[2022-10-05 16:09:49,173] {scheduler_job.py:589} INFO - Executor reports execution of calcuate_stream_analytics.get_recent_active_stream_ids run_id=scheduled__2022-10-05T15:54:42.621643+00:00 exited with status failed for try_number 1
[2022-10-05 16:09:49,179] {scheduler_job.py:632} INFO - TaskInstance Finished: dag_id=calcuate_stream_analytics, task_id=get_recent_active_stream_ids, run_id=scheduled__2022-10-05T15:54:42.621643+00:00, map_index=-1, run_start_date=None, run_end_date=None, run_duration=None, state=queued, executor_state=failed, try_number=1, max_tries=1, job_id=None, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-10-05 16:09:43.061734+00:00, queued_by_job_id=5, pid=None
[2022-10-05 16:09:49,179] {scheduler_job.py:674} ERROR - Executor reports task instance <TaskInstance: calcuate_stream_analytics.get_recent_active_stream_ids scheduled__2022-10-05T15:54:42.621643+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py:873 SAWarning: TypeDecorator ExecutorConfigType() will not produce a cache key because the ``cache_ok`` attribute is not set to True.  This can have significant performance implications including some performance degradations in comparison to prior SQLAlchemy versions.  Set this attribute to True if this type object's state is safe to use in a cache key, or False to disable this warning. (Background on this error at: https://sqlalche.me/e/14/cprf)
[2022-10-05 16:11:14,593] {scheduler_job.py:1369} INFO - Resetting orphaned tasks for active dag runs
[2022-10-05 16:14:50,137] {scheduler_job.py:347} INFO - 1 tasks up for execution:
        <TaskInstance: calcuate_stream_analytics.get_recent_active_stream_ids scheduled__2022-10-05T15:54:42.621643+00:00 [scheduled]>
[2022-10-05 16:14:50,137] {scheduler_job.py:412} INFO - DAG calcuate_stream_analytics has 0/16 running and queued tasks
[2022-10-05 16:14:50,137] {scheduler_job.py:498} INFO - Setting the following tasks to queued state:
        <TaskInstance: calcuate_stream_analytics.get_recent_active_stream_ids scheduled__2022-10-05T15:54:42.621643+00:00 [scheduled]>
[2022-10-05 16:14:50,139] {scheduler_job.py:537} INFO - Sending TaskInstanceKey(dag_id='calcuate_stream_analytics', task_id='get_recent_active_stream_ids', run_id='scheduled__2022-10-05T15:54:42.621643+00:00', try_number=2, map_index=-1) to executor with priority 2 and queue default
[2022-10-05 16:14:50,139] {base_executor.py:95} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'calcuate_stream_analytics', 'get_recent_active_stream_ids', 'scheduled__2022-10-05T15:54:42.621643+00:00', '--local', '--subdir', 'DAGS_FOLDER/reactive/stream_analytics_dags/CalculateStreamAnalytics.py']
[2022-10-05 16:14:51,141] {timeout.py:68} ERROR - Process timed out, PID: 7
[2022-10-05 16:14:51,142] {celery_executor.py:285} INFO - [Try 1 of 3] Task Timeout Error for Task: (TaskInstanceKey(dag_id='calcuate_stream_analytics', task_id='get_recent_active_stream_ids', run_id='scheduled__2022-10-05T15:54:42.621643+00:00', try_number=2, map_index=-1)).
[2022-10-05 16:14:52,201] {timeout.py:68} ERROR - Process timed out, PID: 7
[2022-10-05 16:14:52,202] {celery_executor.py:285} INFO - [Try 2 of 3] Task Timeout Error for Task: (TaskInstanceKey(dag_id='calcuate_stream_analytics', task_id='get_recent_active_stream_ids', run_id='scheduled__2022-10-05T15:54:42.621643+00:00', try_number=2, map_index=-1)).
[2022-10-05 16:14:53,993] {timeout.py:68} ERROR - Process timed out, PID: 7
[2022-10-05 16:14:53,994] {celery_executor.py:285} INFO - [Try 3 of 3] Task Timeout Error for Task: (TaskInstanceKey(dag_id='calcuate_stream_analytics', task_id='get_recent_active_stream_ids', run_id='scheduled__2022-10-05T15:54:42.621643+00:00', try_number=2, map_index=-1)).
[2022-10-05 16:14:56,394] {timeout.py:68} ERROR - Process timed out, PID: 7
[2022-10-05 16:14:56,396] {celery_executor.py:296} ERROR - Error sending Celery task: Timeout, PID: 7
Celery Task ID: TaskInstanceKey(dag_id='calcuate_stream_analytics', task_id='get_recent_active_stream_ids', run_id='scheduled__2022-10-05T15:54:42.621643+00:00', try_number=2, map_index=-1)
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/utils/functional.py", line 30, in __call__
    return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 925, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 179, in send_task_to_executor
    result = task_to_run.apply_async(args=[command], queue=queue)
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/task.py", line 575, in apply_async
    return app.send_task(
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/base.py", line 788, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/amqp.py", line 510, in send_task_message
    ret = producer.publish(
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/messaging.py", line 177, in publish
    return _publish(
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 523, in _ensured
    return fun(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/messaging.py", line 186, in _publish
    channel = self.channel
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/utils/functional.py", line 32, in __call__
    value = self.__value__ = self.__contract__()
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/messaging.py", line 225, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 895, in default_channel
    self._ensure_connection(**conn_opts)
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 433, in _ensure_connection
    return retry_over_time(
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
    return fun(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 877, in _connection_factory
    self._connection = self._establish_connection()
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 812, in _establish_connection
    conn = self.transport.establish_connection()
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 949, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 927, in create_channel
    channel = self.Channel(connection)
  File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", line 737, in __init__
    self.client.ping()
  File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1378, in ping
    return self.execute_command('PING')
  File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 898, in execute_command
    conn = self.connection or pool.get_connection(command_name, **options)
  File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 1192, in get_connection
    connection.connect()
  File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 559, in connect
    sock = self._connect()
  File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 603, in _connect
    sock.connect(socket_address)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 7

So it looks like redis is reachable, but the worker nodes are connecting incorrectly for some reason. Any idea what might be causing this?

Custom Helm Values

# Disable ingress so that the public cant reach our airflow instance
ingress:
  enabled: false

airflow:
  legacyCommands: false
  image:
    repository: my-account-id.dkr.ecr.us-east-1.amazonaws.com/airflow
    tag: 83ab4ed7
    pullPolicy: IfNotPresent
  executor: CeleryExecutor
  config:
    AIRFLOW__CORE__LOAD_EXAMPLES: "False"
    # Logging
    AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://${LOGGING_BUCKET_NAME}/logs"
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "aws_default"
    # Email
    AIRFLOW__EMAIL__EMAIL_BACKEND: "airflow.providers.amazon.aws.utils.emailer.send_email"
    AIRFLOW__SMTP__SMTP_HOST: "<my-host-redacted>"
    AIRFLOW__SMTP__SMTP_MAIL_FROM: "<my-email-redacted>"
    AIRFLOW__SMTP__SMTP_PORT: "25"
    AIRFLOW__SMTP__SMTP_SSL: "False"
    AIRFLOW__SMTP__SMTP_STARTTLS: "False"
    # Domain used in airflow emails
    AIRFLOW__WEBSERVER__BASE_URL: "https://airflow-cluster1.example.com/"
  variablesUpdate: true
  variables:
    - key: "environment"
      value: "prod"
  extraEnv:
    - name: PUBLIC_FOT_API_URL
      valueFrom:
        configMapKeyRef:
          name: public-resource-urls
          key: PUBLIC_FOT_API_URL
    - name: STREAM_ANALYTICS_BUCKET
      valueFrom:
        configMapKeyRef:
          name: environment-config
          key: STREAM_ANALYTICS_BUCKET
    - name: SERVICE_ACCOUNT_TOKEN
      valueFrom:
        secretKeyRef:
          name: service-accounts-secrets
          key: AIRFLOW_SERVICE_ACCOUNT_TOKEN
  connections:
    - id: aws_default
      type: aws
      description: my AWS connection

      ## NOTE: don't provide `login` or `password`,
      ##       otherwise they will take precedence over the ones injected by IRSA
      extra: |
        {
          "region_name": "us-east-1"
        }
  fernetKey: "<my-fernetkey-redacted>="
  webserverSecretKey: <my-webserverkey-redacted>
  users:
  - username: <my-username-redacted>
    password: <my-password-redacted>
    role: Admin
    email: <my-email-redacted>
    firstName: <my-firstname-redacted>
    lastName: <my-lastname-redacted>

workers:
  enabled: true
  replicas: 1
  resources: 
    limits:
      cpu: 2000m
      memory: 4G
    requests:
      cpu: 250m
      memory: 2G
  podDisruptionBudget:
    enabled: true
    maxUnavailable: "100%"
  autoscaling:
    enabled: true
    maxReplicas: 2
    metrics:
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

  ## configs for the celery worker Pods
  celery:
    ## if celery worker Pods are gracefully terminated
    ## - consider defining a `workers.podDisruptionBudget` to prevent there not being
    ##   enough available workers during graceful termination waiting periods
    ##
    ## graceful termination process:
    ##  1. prevent worker accepting new tasks
    ##  2. wait AT MOST `workers.celery.gracefullTerminationPeriod` for tasks to finish
    ##  3. send SIGTERM to worker
    ##  4. wait AT MOST `workers.terminationPeriod` for kill to finish
    ##  5. send SIGKILL to worker
    ##
    gracefullTermination: false
    gracefullTerminationPeriod: 600
  terminationPeriod: 60
  logCleanup:
    enabled: true
    resources: 
      limits:
        cpu: 2000m
        memory: 2G
      requests:
        cpu: 10m
        memory: "32Mi"

triggerer:
  enabled: true
  replicas: 1
  resources:
    limits:
      cpu: 2000m
      memory: 4G
    requests:
      cpu: 256m
      memory: 2G

flower:
  enabled: true
  replicas: 1
  resources:
    limits:
      cpu: 2000m
      memory: 2G
    requests:
      cpu: 10m
      memory: "64Mi"

web:
  resources:
    limits:
      cpu: 2000m
      memory: 2G
    requests:
      cpu: 200m
      memory: 1G

scheduler:
  resources:
    limits:
      cpu: 2000m
      memory: 2G
    requests:
      cpu: 1000m
      memory: .5G
  logCleanup:
    enabled: true
    retentionMinutes: 21600
    resources:
      requests:
        cpu: "10m"
        memory: "32Mi"
  livenessProbe:
    enabled: true
    taskCreationCheck:
      enabled: false
      thresholdSeconds: 1800
      schedulerAgeBeforeCheck: 1800

pgbouncer:
  enabled: false
  resources: {}

## DATABASE | Embedded Postgres
postgresql:
  enabled: false

externalDatabase:
  type: postgres
  host: ${DATABASE_HOST}
  port: ${DATABASE_PORT}
  database: airflow
  userSecret: ${SECRET_NAME}
  userSecretKey: "db_username"
  passwordSecret: ${SECRET_NAME}
  passwordSecretKey: "db_password"

## DATABASE | Embedded Redis
redis:
  enabled: true
  ## configs for redis cluster mode
  cluster:
    enabled: false
    slaveCount: 1

  ## configs for the redis master StatefulSet
  master:
    ## resource requests/limits for the redis master Pods
    ## [SPEC] https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#resourcerequirements-v1-core
    resources:
      requests:
        cpu: "10m"
        memory: "32Mi"

    ## configs for the PVC of the redis master Pods
    persistence:
      enabled: false
      storageClass: ""
      size: 8Gi

  ## configs for the redis slave StatefulSet
  slave:
    ## resource requests/limits for the slave Pods
    ## [SPEC] https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#resourcerequirements-v1-core
    resources:
      requests:
        cpu: "10m"
        memory: "32Mi"

    ## configs for the PVC of the redis slave Pods
    persistence:
      enabled: false
      storageClass: ""
      size: 8Gi

externalRedis:
  ## the host of the external redis
  ## [FAQ] https://github.com/airflow-helm/charts/blob/main/charts/airflow/docs/faq/database/external-redis.md
  host: localhost
agconti commented 1 year ago

Looks like this might be related to #573, but the fix there did not solve things for me.

Tried upgrading to apache/airflow:2.2.5-python3.9 like the issue suggests and latest airflow stable, apache/airflow:2.3.3-python3.9, without success.

thesuperzapper commented 1 year ago

@agconti as I mentioned in https://github.com/airflow-helm/charts/issues/573#issuecomment-1269271170, it's almost certainly an issue with an incorrect version of the kombu or celery pip packages.

Your custom container image might not have the right version because when you run pip install these days, pip tries to be smart and might upgrade/downgrade the version of an already installed package to prevent version conflicts.

So take a look at your custom image, and see if your pip installs are changing the version of kombu or celery. Also, does your environment work with the default apache/airflow:2.2.5-python3.9 image?

agconti commented 1 year ago

@thesuperzapper thanks for your help! 💖

I agree, I think its definitely something wrong with kombu or celery though I can't imagine what I did in my setup to cause it.

Here's a bit more context:

FROM apache/airflow:2.3.3-python3.9

# Allows docker to cache installed dependencies between builds
COPY ./requirements.txt requirements.txt
RUN pip install -r requirements.txt

COPY . /opt/airflow
requests==2.28.1
pandas==1.5.0
pyarrow==9.0.0
apache-airflow[amazon]==2.4.0

At any rate, I got around this problem this morning by switching to the KubernetesExecutor instead to avoid Celery altogether. It's not a solution or closure to this problem, but it works for our use case. I want to be respectful of your precious volunteer time and close this issue since it's no longer a problem for us. Hopefully, my debugging trail here can be helpful to anyone else running into a similar issue. Thanks for your help in trying to solve this!

thesuperzapper commented 1 year ago

@agconti actually I think your above Dockerfile has a very clear issue, you should not be changing the version of airflow by installing apache-airflow[amazon]==2.4.0 into the 2.3.3-python3.9 container image, this will inevitably cause problems.

What's great is that you don't need to install the amazon or any other "extra" pip packages, because they are all installed in the official apache/airflow images by default, so you can just remove that requirement, and everything should work!

EDIT: I highly recommend using CeleryExecutor in most situations, because starting a whole Pod for every task is very wasteful (alternatively use CeleryKubernetesExecutor and get the best of both worlds), plus that will let you use my upcoming tasks-aware autoscaler feature!

agconti commented 1 year ago

@thesuperzapper thanks for your help! I didn't realize that the amazon package was already installed. I'll make the update. 💖

Thanks for your recommendation on CeleeryExecutor as well!