apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.4k stars 14.11k forks source link

Airflow + Advanced Logging Configuration to stdout + CeleryExecutor does not write logs to stdout as expected #38479

Closed shivshav closed 6 months ago

shivshav commented 6 months ago

Apache Airflow version

2.8.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We would like to setup airflow to output task logs to stdout along with its usual mechanism of logging task logs to a file which then get pushed to some remote logs destination. Setting up a custom log configuration to do this with a custom handler that exists alongside the task handler on the airflow.task logger does not work specifically when using the CeleryExecutor. Instead, no logs appear at all on stdout and we only get the logs normally generated in the usual task log files the task log handler writes to.

This configuration does work if using the LocalExecutor (haven't tried other executors to see if this is a problem with more than just the CeleryExecutor fwiw as we only use the LocalExecutor in local development and the CeleryExecutor in our deployed environments)

What you think should happen instead?

Based on the information noted here and our setup, I would've expected logs to appear in both the task log files and on stdout so our usual log collectors can collect/ship them as normal.

How to reproduce

Docker Compose set up (note, we normally utilize our own custom airflow images based on the official ones, but I was able to repro with the official ones so the images for the airflow components point to that just to remove any extra moving parts)

# docker-compose.yaml
version: '2.4'

## Shared YAML anchors for configuration
## Note: top-level keys prefixed with 'x-' are ignored by docker-compose for parsing, hence the naming
# Common config for postgres connection
x-pg-envs: &pg-envs
  POSTGRES_USER: airflow
  POSTGRES_PASSWORD: airflow
  POSTGRES_DB: airflow
  PGUSER: airflow

# Common configuration for airflow containers shared as a YAML anchor
x-airflow-app: &airflow-app
  image: apache/airflow:2.8.3-python3.11
  build:
    context: .
  restart: always
  env_file:
    - .env
  environment:
    <<: *pg-envs
    _AIRFLOW_WWW_USER_CREATE: 'true'
    _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
    _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
  depends_on: 
    airflow_postgres:
      condition: service_healthy
    redis:
      condition: service_healthy
  volumes:
    - airflow_logs:/opt/airflow/logs
    - ./config/airflow.cfg.dev:/opt/airflow/airflow.cfg
    - ./config/local:/opt/airflow/config
    - ./test-dags:/opt/airflow/dags/repo

services:
  airflow_postgres:
    image: postgres:16
    environment:
      <<: *pg-envs
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U airflow -d airflow"]
      interval: 1s
      timeout: 5s
      retries: 10
    ports:
      - "5435:5432"
    volumes:
      - airflow_local_postgres:/var/lib/postgresql/data 

  redis:
    image: redis:6
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 2s
      retries: 5
      start_period: 3s
    volumes:
    - redis_data:/data

  webserver:
    <<: *airflow-app
    command: ["webserver"]
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD-SHELL", "[ -f /opt/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

  scheduler:
    <<: *airflow-app
    command: ["scheduler"]

  # The worker and flower services aren't relevant for the LocalExecutor setup, just the CeleryExecutor setup
  worker:
    <<: *airflow-app
    command: ["celery", "worker"]
    healthcheck:
      test: ["CMD-SHELL", "[-f /opt/airflow/airflow-worker.pid"]
      interval: 30s
      timeout: 30s
      retries: 3
  flower:
    <<: *airflow-app
    command: ["celery", "flower"]
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-flower.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3
    ports:
    - "5555:5555"

  migrate_db:
    <<: *airflow-app
    command: ["db", "init"]
    restart: on-failure

volumes:
  airflow_local_postgres:
  airflow_logs:
  redis_data:

Custom log configuration, located in config/local and mounted under /opt/airflow/config in the above docker-compose.yaml

# log_config.py
from copy import deepcopy
import sys

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

# code taken from "https://github.com/apache/airflow/discussions/29920#discussioncomment-5208504"

LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
LOGGING_CONFIG["handlers"]["stdout"] = {
    "class": "logging.StreamHandler",
    "formatter": "airflow",
    "stream": sys.stdout, # I have also tried `"ext://sys.stdout"` (no difference) and `"sys.stdout"` (crashes the task when using CeleryExecutor)
    "level": "INFO"
}
LOGGING_CONFIG["loggers"]["airflow.task"]["handlers"] = ["stdout", "task"]

Custom minimally reproducible DAG example (located at ./test-dags/test-dag.py and mounted as ./test-dags:/opt/airflow/dags/repo in the above docker-compose configuration)

# test-dag.py
import datetime
import logging

from airflow import DAG
from airflow.decorators import task

logger = logging.getLogger(__name__)

# I also tried using the old non-task flow API with no difference in case that was a problem
@task
def log_me(msg):
    task_logger = logging.getLogger(__name__) # using a newly grabbed logger here in case scope is the problem with the other logger
    logger.warn(msg) # does not log to stdout with CeleryExecutor
    logger.info(msg) # does not log to stdout with CeleryExecutor
    task_logger.warn(msg) # does not log to stdout with CeleryExecutor
    task_logger.info(msg) # does not log to stdout with CeleryExecutor

with DAG(
    dag_id="test_dag",
    start_date=datetime.datetime(2024, 3, 19),
    schedule="@daily"
):
    log_me("hiiiiii")

And finally, our custom airflow config with settings trimmed down to what I believe are the relevant settings (located at ./config/airflow.cfg.dev and mounted as /opt/airflow/airflow.cfg in the provided docker-compose configuration above

# airflow.cfg
[core]
...

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor  # If I change this to LocalExecutor, logs appear on stdout as expected

...

[logging]
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG

# Logging level
logging_level = INFO
celery_logging_level = INFO
fab_logging_level = WARN
...

With the above configuration set up, run docker compose up, then try to run test_dag in the UI. Observe log output from docker-compose containers and see no task logs printed out.

Operating System

OSX

Versions of Apache Airflow Providers

All from using the default airflow image

apache-airflow-providers-amazon==8.16.0
apache-airflow-providers-apache-hive==6.4.1
apache-airflow-providers-celery==3.5.1
apache-airflow-providers-cncf-kubernetes==7.13.0
apache-airflow-providers-common-io==1.2.0
apache-airflow-providers-common-sql==1.10.0
apache-airflow-providers-docker==3.9.1
apache-airflow-providers-elasticsearch==5.3.1
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.13.1
apache-airflow-providers-grpc==3.4.1
apache-airflow-providers-hashicorp==3.6.1
apache-airflow-providers-http==4.8.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-jdbc==4.2.1
apache-airflow-providers-microsoft-azure==8.5.1
apache-airflow-providers-mysql==5.5.1
apache-airflow-providers-odbc==4.4.0
apache-airflow-providers-openlineage==1.4.0
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-redis==3.6.0
apache-airflow-providers-salesforce==5.6.1
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sftp==4.8.1
apache-airflow-providers-slack==8.5.1
apache-airflow-providers-snowflake==5.2.1
apache-airflow-providers-sqlite==3.7.0
apache-airflow-providers-ssh==3.10.0

Deployment

Docker-Compose

Deployment details

Docker Compose version v2.24.6-desktop.1

Anything else?

A curious thing that may or may not be relevant to what's going on. Most examples I've seen utilize the string "sys.stdout" as the input for stream when defining a handler. However, using the string causes the celery worker to exit with some stack traces around a string not being "writable". This was surprising to me, and could maybe point us in the appropriate direction for what's going wrong potentially? Or it's a complete red herring and I apologize 😅

Here's a sample of the output I'd expect to see (except coming from the worker container) when attached to the containers with docker-compose (this was captured from using the LocalExecutor to confirm I wasn't crazy initially)

Screenshot 2024-03-25 at 7 44 00 PM

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.