apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.29k stars 13.78k forks source link

Google Stackdriver Logging not working as expected #39613

Open goktugkose opened 1 month ago

goktugkose commented 1 month ago

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We are experiencing an issue with Stackdriver Logging with Airflow. As changelog suggests, with Airflow 2.9.1 version Google provider is updated to 10.17.0 and solves the bugs with Stackdriver logging. We tested with the latest version. However, we cannot configure remote logging with Stackdriver. As an initial step, we created a log bucket in Cloud Monitoring and also created a log sink to investigate the logs. We use the configuration below in Helm chart, GOOGLE_APPLICATION_CREDENTIALS is set as an environment variable and a Google Cloud connection containing the same service account is added to Airflow with these scopes https://www.googleapis.com/auth/cloud-platform, https://www.googleapis.com/auth/logging.admin

Findings:

  1. Same service account works fine with GCS Logging.
  2. No logs are written in the base_log_folder

Problems faced:

  1. Documentation states that the logs supposed to be shown in real time. However, we are waiting for logs to be loaded to Airflow UI.
  2. Shown logs does not contain the application logs that are printed using logging library. (I will share the DAG file in the thread.)

Example DAG File:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

import logging, time
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': datetime(2024, 4, 24),
    "retries": 0,
}

dag = DAG('test', default_args=default_args, schedule_interval="*/20 * * * *", catchup=False, max_active_runs=1, tags=["test"])

def func():
    steps = 10
    logger.info(f"Executing {steps} steps...")
    for i in range(10):
        logger.info(f"Step {i+1} executed.")
        time.sleep(1)

    logger.info("Successfully finished!")

task_1 = PythonOperator(
    dag = dag,
    task_id = 'task_1',
    python_callable = func
)

task_1

Produced Logs (not real time, they appear after task completion):

[2024-05-09, 16:40:04 +03] {local_task_job_runner.py:120} ▼ Pre task execution logs
[2024-05-09, 16:40:05 +03] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test.task_1 scheduled__2024-05-09T13:20:00+00:00 [queued]>
[2024-05-09, 16:40:05 +03] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test.task_1 scheduled__2024-05-09T13:20:00+00:00 [queued]>
[2024-05-09, 16:40:05 +03] {taskinstance.py:2306} INFO - Starting attempt 1 of 1
[2024-05-09, 16:40:06 +03] {taskinstance.py:2330} INFO - Executing <Task(PythonOperator): task_1> on 2024-05-09 13:20:00+00:00
[2024-05-09, 16:40:06 +03] {warnings.py:110} WARNING - /home/airflow/.local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py:61: DeprecationWarning: This process (pid=865) is multi-threaded, use of fork() may lead to deadlocks in the child.
  pid = os.fork()

[2024-05-09, 16:40:06 +03] {standard_task_runner.py:63} INFO - Started process 882 to run task
[2024-05-09, 16:40:31 +03] {local_task_job_runner.py:310} WARNING - State of this instance has been externally set to success. Terminating instance.
[2024-05-09, 16:40:31 +03] {local_task_job_runner.py:222} ▲▲▲ Log group end
[2024-05-09, 16:40:31 +03] {process_utils.py:132} INFO - Sending 15 to group 882. PIDs of all processes in the group: [882]
[2024-05-09, 16:40:31 +03] {process_utils.py:87} INFO - Sending the signal 15 to group 882
[2024-05-09, 16:40:31 +03] {process_utils.py:80} INFO - Process psutil.Process(pid=882, status='terminated', exitcode=0, started='13:40:05') (882) terminated with exit code 0

What you think should happen instead?

As suggested by Airflow documentation, logs should be loaded in real time. Other logging configurations such as Elasticsearch provides this behavior. Also, task logs should also be served in the UI. I have tested with GCS Logging and we expect to see the similar logs with Stackdriver logging.

*** Found remote logs:
***   * gs://gke-airflow-logs/dag_id=test/run_id=manual__2024-05-14T09:17:07.661383+00:00/task_id=task_1/attempt=1.log
[2024-05-14, 12:17:14 +03] {local_task_job_runner.py:120} ▶ Pre task execution logs
[2024-05-14, 12:17:19 +03] {test.py:23} INFO - Executing 10 steps...
[2024-05-14, 12:17:19 +03] {test.py:25} INFO - Step 1 executed.
[2024-05-14, 12:17:20 +03] {test.py:25} INFO - Step 2 executed.
[2024-05-14, 12:17:21 +03] {test.py:25} INFO - Step 3 executed.
[2024-05-14, 12:17:22 +03] {test.py:25} INFO - Step 4 executed.
[2024-05-14, 12:17:23 +03] {test.py:25} INFO - Step 5 executed.
[2024-05-14, 12:17:24 +03] {test.py:25} INFO - Step 6 executed.
[2024-05-14, 12:17:25 +03] {test.py:25} INFO - Step 7 executed.
[2024-05-14, 12:17:26 +03] {test.py:25} INFO - Step 8 executed.
[2024-05-14, 12:17:27 +03] {test.py:25} INFO - Step 9 executed.
[2024-05-14, 12:17:28 +03] {test.py:25} INFO - Step 10 executed.
[2024-05-14, 12:17:29 +03] {test.py:28} INFO - Successfully finished!
[2024-05-14, 12:17:29 +03] {python.py:237} INFO - Done. Returned value was: None
[2024-05-14, 12:17:29 +03] {taskinstance.py:441} ▶ Post task execution logs

How to reproduce

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon          | 8.20.0 
apache-airflow-providers-celery          | 3.6.2  
apache-airflow-providers-cncf-kubernetes | 8.1.1  
apache-airflow-providers-common-io       | 1.3.1  
apache-airflow-providers-common-sql      | 1.12.0 
apache-airflow-providers-docker          | 3.10.0 
apache-airflow-providers-elasticsearch   | 5.3.4  
apache-airflow-providers-fab             | 1.0.4  
apache-airflow-providers-ftp             | 3.8.0  
apache-airflow-providers-google          | 10.17.0
apache-airflow-providers-grpc            | 3.4.1  
apache-airflow-providers-hashicorp       | 3.6.4  
apache-airflow-providers-http            | 4.10.1 
apache-airflow-providers-imap            | 3.5.0  
apache-airflow-providers-microsoft-azure | 10.0.0 
apache-airflow-providers-mysql           | 5.5.4  
apache-airflow-providers-odbc            | 4.5.0  
apache-airflow-providers-openlineage     | 1.7.0  
apache-airflow-providers-postgres        | 5.10.2 
apache-airflow-providers-redis           | 3.6.1  
apache-airflow-providers-sendgrid        | 3.4.0  
apache-airflow-providers-sftp            | 4.9.1  
apache-airflow-providers-slack           | 8.6.2  
apache-airflow-providers-smtp            | 1.6.1  
apache-airflow-providers-snowflake       | 5.4.0  
apache-airflow-providers-sqlite          | 3.7.1  
apache-airflow-providers-ssh             | 3.10.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

potiuk commented 1 month ago

cc: @VladaZakharova and Google team

VladaZakharova commented 1 month ago

cc: @VladaZakharova and Google team

Sure, we will take a look :)

random-user-hdbbjw commented 1 month ago

30740

Actually the BackgroundThreadTransport mode of StackdriverTaskHandler is not working properly.

I also have a airflow running KubernetesExecutor and eventually I have to implement custom logging config to override BackgroundThreadTransport with SyncTransport in order to show full log.

Also showing logs from stackdriver is slow.