apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.3k stars 13.79k forks source link

Google Stackdriver logging not working #30740

Open matteosdocsity opened 1 year ago

matteosdocsity commented 1 year ago

Apache Airflow version

2.5.3

What happened

Using the steps in the documentation within Airflow Helm Chart 1.9 with KubernetesExecutor does not show neither print("This is a print logging message") or

import logging
logger = logging.getLogger("airflow.task")
logger.info("This is a log message")

Authentication is based with GOOGLE_APPLICATION_CREDENTIALS having also the role roles/logging.admin and having the config enabled via values.yaml in the helm and apache-airflow-providers-google==9.0.0 chart with:

config:
  logging:
    remote_logging: 'True'
    remote_base_log_folder: 'stackdriver://airflow-tasks'

The only logs viewable in the UI and both in Google Stackdriver console are related to be only internal, like:

[2023-04-19, 06:39:04 UTC] {taskinstance.py:1309} INFO - Executing <Task(TemplatedPythonOperator): hello-task> on 2023-02-15 00:00:00+00:00 [2023-04-19, 06:39:04 UTC] {standard_task_runner.py:55} INFO - Started process 36 to run task [2023-04-19, 06:40:15 UTC] {local_task_job.py:276} WARNING - State of this instance has been externally set to success. Terminating instance. [2023-04-19, 06:40:15 UTC] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 36. PIDs of all processes in the group: [36] [2023-04-19, 06:40:15 UTC] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 36 [2023-04-19, 06:40:16 UTC] {process_utils.py:79} INFO - Process psutil.Process(pid=36, status='terminated', exitcode=0, started='06:39:04') (36) terminated with exit code 0

What you think should happen instead

The Stackdriver remote logging should output the logs in the Web UI and Stackdriver console.

How to reproduce

Use of:

config:
  logging:
    remote_logging: 'True'
    remote_base_log_folder: 'stackdriver://airflow-tasks'

Operating System

Helm 1.9

Versions of Apache Airflow Providers

apache-airflow-providers-google==9.0.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

Using official Helm Chart 1.9 (also reproducible with 1.8, 1.7)

Anything else

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 1 year ago

Cc: @dstandish - this looks suspiciously like related to the logging changes as it looks some logs (the internal ones) are properly sent

potiuk commented 1 year ago

bTw @matteosdocsity - did you log it from inside your task ? Can you please share more code of your DAG showing how you are doing it (and BtW i calrified with Daniel that you should be able to use ANY logger - see the new PR i just created

potiuk commented 1 year ago

https://github.com/apache/airflow/pull/30746

matteosdocsity commented 1 year ago

@potiuk sure this is the task code

def print_hello_world(ds):
    # with default airflow logging settings, DEBUG logs are ignored
    logger.info("This log is at the level of INFO")

    # each of these lines produces a log statement
    print("This log is created with a print statement")

    time.sleep(120)
    print('Again, Hello World!')
    return 'OK'

and below the dag code

import sys
from os import path
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator

sys.path.append(path.dirname(path.abspath(__file__)))
from custom_operators.hello_world import print_hello_world
sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))

ENV = (Variable.get('ENV', default_var='dev') or 'test').lower()
from datetime import datetime, timedelta
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
start_date = datetime.strptime(Variable.get('DEFAULT_START_DATE', default_var=yesterday),'%Y-%m-%d')
test = 1

args = {'owner': 'airflow', 'start_date': start_date, 'depends_on_past': False}

resource_config = {"KubernetesExecutor": {"request_memory": "500Mi",
                                          "limit_memory": "500Mi",
                                          "request_cpu": "250m",
                                          "limit_cpu": "250m"}}

with DAG(
    dag_id='dag_for_test',
    default_args=args,
    schedule_interval='@once',
    max_active_runs=1,
    catchup=True, description="Simple task",
    tags=['test']
) as dag:
    suffix = '{{ macros.ds_add(ds, 1) | replace("-","/") }}'

    hello = PythonOperator(
        task_id='hello-task',
        provide_context=True,
        python_callable=print_hello_world,
        op_kwargs={'suffix': suffix},
        executor_config=resource_config,
        dag=dag
    )
dstandish commented 1 year ago

hi, lemme catch up here

dstandish commented 1 year ago

question for you... what's the last good version that this worked with? provider+airflow.

e.g. if you use 2.5.3 and 8.9.0 does it work? if it's related to trigger logging change, then it should work with 8.9.0 and not work with 8.10.0

separately, there's an issue with k8s executor pod logging that is hopefully resolved in 2.6 by this PR https://github.com/apache/airflow/pull/28440. probably not related but, just in case.... the issue there was related to ensuring that in k8s executor scenario, task logging is echoed to stdout and not just file, since webserver tries to read from pod stdout when task running.

dstandish commented 1 year ago

yeah so going back more through the history ... the trigger logging changes in core aren't out yet. and i don't think it's likely that the provider changes in 8.10 are likely a cause... but there were some other logging related changes that came out, some related to k8s executor, in 2.5.x --- so @matteosdocsity if you could help narrow it down that would be appreciated -- what's the last version combination that works e.g. does it work with provider 9.0.0 and airflow 2.5.0, or 2.5.1, or 2.5.2?

dstandish commented 1 year ago

@potiuk

Cc: @dstandish - this looks suspiciously like related to the logging changes as it looks some logs (the internal ones) are properly sent

yeah that actually looks suspiciously related to the phenomenon addressed by https://github.com/apache/airflow/pull/28440 but not sure we need some more info about last good version to narrow down i think

matteosdocsity commented 1 year ago

@dstandish unfortunately I've just started working with Stackdriver in order to get streaming logs with GKE, and that's because I understand it's one of the best method in this cloud environment (GCP). I've tested with latest helm 1.9, but also the same with 1.8, and Airflow version 2.5.1,2.5.2 and 2.5.3 (and also with apache-airflow-providers-google==8.10.0) without any chance. Is there something of previous combinations of versions that you believe it deserves to be tested? I'd be happy to help if I can

dstandish commented 1 year ago

yes can you try with provider 8.9.0? and then, if that doesn't work, maybe 8.9.0 and airflow < 2.5.

so sounds like you're not sure it ever worked.

and am i correct that it's only a problem when using k8s executor? and is it only a problem to view logs while task running? or, even after task completes, logs not there?

matteosdocsity commented 1 year ago

@dstandish tested now with provider 8.9.0 with no success. Don't know if the problem is related with k8s executor only even if the mentioned stdout problem by you before could also be...As reported I can see both logs (even if when the task is running and not only for task completion) viewable in the UI and both in Google Stackdriver console are related to be only internal, like:

[2023-04-19, 06:39:04 UTC] {taskinstance.py:1309} INFO - Executing <Task(TemplatedPythonOperator): hello-task> on 2023-02-15 00:00:00+00:00 [2023-04-19, 06:39:04 UTC] {standard_task_runner.py:55} INFO - Started process 36 to run task [2023-04-19, 06:40:15 UTC] {local_task_job.py:276} WARNING - State of this instance has been externally set to success. Terminating instance. [2023-04-19, 06:40:15 UTC] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 36. PIDs of all processes in the group: [36] [2023-04-19, 06:40:15 UTC] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 36 [2023-04-19, 06:40:16 UTC] {process_utils.py:79} INFO - Process psutil.Process(pid=36, status='terminated', exitcode=0, started='06:39:04') (36) terminated with exit code 0

I've tested also on 2.5.3 with docker-compose (no helm) with both Sequential and Local executors with no success.

Going to test now with airflow 2.4.3

matteosdocsity commented 1 year ago

@dstandish no success also with airflow 2.4.3

potiuk commented 1 year ago

@matteosdocsity

def print_hello_world(ds):
    # with default airflow logging settings, DEBUG logs are ignored
    logger.info("This log is at the level of INFO")

How do you get the logger here ? Is it just the logger = logging.getLogger(__name__) ? or logging.getLogger("airflow.task")?

Note that in my first PR /doc I used "airflow.task" but __name__ is generally better approach. Can you check with it?

potiuk commented 1 year ago

And - can you create the logger inside the function ? I wonder if it makes a difference?

def print_hello_world(ds):
    logger = logging.getLogger(__name__) 
    # with default airflow logging settings, DEBUG logs are ignored
    logger.info("This log is at the level of INFO")
matteosdocsity commented 1 year ago

@potiuk the logger was created before function globally with logging.getLogger("airflow.task"), even with your code having logger = logging.getLogger(__name__) inside the function still produces no results

potiuk commented 1 year ago

@potiuk the logger was created before function globally with logging.getLogger("airflow.task"), even with your code having logger = logging.getLogger(__name__) inside the function still produces no results

Ok. thanks. That's weird.

dstandish commented 1 year ago

Here's a note i have from trigger logging dev time:

image

It rung a bit of a bell and I'm not sure it was actually working. Though I def had it working in my local env by the time of merging the trigger logging change into main. Though I don't know what exactly was wrong with it. You could look at that commit and might reveal something. Or you might try running locally from main and see what you get. I am gone through end of this weekend. By that time we might have a 2.6 RC you could try

matteosdocsity commented 1 year ago

@dstandish @josh-fell @potiuk tested with apache-airflow==2.6.0rc1 and apache-airflow-providers-google==10.0.0 but nothing changed. Like before I can see only internal logs, but no prints or loggings.

[2023-04-26, 12:21:20 UTC] {taskinstance.py:1125} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: hello-temp.hello-task scheduled__2023-04-24T22:00:00+00:00 [queued]>
[2023-04-26, 12:21:20 UTC] {taskinstance.py:1125} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: hello-temp.hello-task scheduled__2023-04-24T22:00:00+00:00 [queued]>
[2023-04-26, 12:21:20 UTC] {taskinstance.py:1331} INFO - Starting attempt 2 of 2
[2023-04-26, 12:21:20 UTC] {taskinstance.py:1350} INFO - Executing <Task(TemplatedPythonOperator): hello-task> on 2023-04-24 22:00:00+00:00
[2023-04-26, 12:21:20 UTC] {standard_task_runner.py:57} INFO - Started process 30270 to run task
[2023-04-26, 12:21:30 UTC] {local_task_job_runner.py:298} WARNING - State of this instance has been externally set to success. Terminating instance.
[2023-04-26, 12:21:30 UTC] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 30270. PIDs of all processes in the group: [30270]
[2023-04-26, 12:21:30 UTC] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 30270
[2023-04-26, 12:21:30 UTC] {process_utils.py:79} INFO - Process psutil.Process(pid=30270, status='terminated', exitcode=0, started='12:21:19') (30270) terminated with exit code 0

any other ideas or way that I could try to give help?

brokenjacobs commented 1 year ago

I'll bring attention to this here, sounds like the same issue I'm encountering: https://github.com/apache/airflow/discussions/21837

I'm also running in GKE, but using the CeleryExecutor and seeing the same sort of behavior. I don't think the execution engine is the issue here, it has to be something else.

brokenjacobs commented 1 year ago

pretty sure the google-cloud-logging software has an issue with fork()

potiuk commented 1 year ago

pretty sure the google-cloud-logging software has an issue with fork()

Do you have some sources to back it up?

dstandish commented 1 year ago

Yeah i think there's something perhaps with the way stackdriver uses threads that may be incompatible with the way we run tasks

it uses class BackgroundThreadTransport

our tasks are run as (usually) forked subrocesses and probably the subprocess messages get lost.

brokenjacobs commented 1 year ago

pretty sure the google-cloud-logging software has an issue with fork()

Do you have some sources to back it up?

https://github.com/googleapis/python-logging/issues/550

So I guess good news this isn't an airflow issue! Bad news, this has been broken for a long while and no one noticed.

brokenjacobs commented 1 year ago

One workaround that (should) work, haven't tested yet, is to use the Sync handler instead of the Async one. But stackdriver is already so slow... I don't think that would tolerable. Although if the issue is that the fork() does something to their socket and the code doesn't recover, the Sync handler won't matter either.

matteosdocsity commented 1 year ago

hi are there any updates? even if the official googleapis is not working I believe there is something strange...I mean how Google Cloud Composer could log into Stackdriver? Any clue?

dstandish commented 1 year ago

If GCC has it working perhaps they could contribute fix to OSS. I'm not sure how they have it configured. @potiuk you have contacts there?

And yeah sync would be worth trying. If that works, maybe there could be a way to solve using the queuelistener pattern.

potiuk commented 1 year ago

If GCC has it working perhaps they could contribute fix to OSS. I'm not sure how they have it configured. @potiuk you have contacts there?

And yeah sync would be worth trying. If that works, maybe there could be a way to solve using the queuelistener pattern.

This I do not know but @kosteev @mhenc @bkossakowska might be able to answer that?

I don't even know if this is not some kind of misconfiguration.

kosteev commented 1 year ago

Last time (~1 year ago) I checked stackdriver_task_handler, it was broken in community Airflow. In Cloud Composer there is a separate internally-written task handler which writes/reads logs from Cloud Logging.

dstandish commented 1 year ago

@kosteev any chance google team woud contribute the handler? (might you be able to nudge appropriate persons?)

potiuk commented 1 year ago

@kosteev any chance google team woud contribute the handler? (might you be able to nudge appropriate persons?)

FWIW. I think it is based on the fact that Airflow is actually run on GKE and log integration is not done through our handler mechanism, so it's not "generic use" by anyone.

dstandish commented 1 year ago

Ah, that makes sense.

dstandish commented 1 year ago

So that would be the answer for @matteosdocsity

dstandish commented 1 year ago

@matteosdocsity @brokenjacobs if you want to try to come up with a solution I'm happy to advise

matteosdocsity commented 1 year ago

so are we aware to the fact that no custom task handler is built on Cloud Composer? I agree with @potiuk that GKE expose logs by default...but the integration within Airflow UI is related to something custom made

kosteev commented 1 year ago

There is a feature in Cloud Composer about "Airflow Task Logs in Cloud Logging" which is done via custom task handler that reads/writes from/to Cloud Logging. This feature is public and enabled per customer need. It can be a good idea to contribute it to community.

kosteev commented 1 year ago

Btw, here is source code https://github.com/GoogleCloudPlatform/composer-airflow/blob/2.5.1/airflow/composer/composer_task_handler.py

kosteev commented 1 year ago

Actually this task handler only reads logs from Cloud Logging. Writing logs to Cloud Logging is done involving GKE logs + Fluentd https://cloud.google.com/composer/docs/composer-2/environment-architecture.

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

matteosdocsity commented 1 month ago

even with last version of apache-airflow and apache-airflow-providers-google , if there's no way to both write and read to google stackdriver, maybe it's better to remove stackdriver as a logging option