apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.5k stars 14.13k forks source link

Airflow log cannot be displayed on logs page #40431

Closed captainj1993 closed 2 months ago

captainj1993 commented 3 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.9.1

What happened?

When trying to check logs in airflow page, the log keeps loading and cannot display. image

Logs are configured to push to elasticsearch. Click "View Logs in Elasticsearch", logs can be seen on ES image

Click "See More" button, the page will keep loading and never stop. Same log content will be append to the end of page again and again image

We are running airflow on AWS EKS nodes, EKS version is 1.28 The docker image is built by ourself

airflow config map: [logging] logging_level = DEBUG base_log_folder = /opt/airflow/logs log_format = [%%(asctime)s] [%%(levelname)s] %%(filename)s:%%(lineno)d - %%(message)s simple_log_format = [%%(asctime)s] [%%(levelname)s] - %%(message)s colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] [%%(log_color)s%%(levelname)s%%(reset)s] %%(blue)s%%(filename)s:%%(reset)s%%(lineno)d - %%(log_color)s%%(message)s%%(reset)s log_formatter_class = log_config.CustomSecretsMasker file_task_handler_new_folder_permissions = 0o777 remote_logging = True

[elasticsearch] host = https://xxx:xxx@xxx.xxx.xxx:xxxx write_stdout = True json_format = True offset_field = log.offset frontend = https://xxx.xxx.xxx/kibana/app/kibana#/discover?_a=(columns:!(message),query:(language:kuery,query:'log_id:"{log_id}"'),sort:!(offset,asc))&_g=(refreshInterval:(pause:!t,value:0),time:(from:now-1y,mode:quick,to:now)) end_of_log_mark = end_of_log log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}

[elasticsearch_configs] use_ssl = True verify_certs = True ca_certs = /ssl/certs/generated-cacert.pem

For some dags the log does not work, but for some dags the log is good. Comparing the difference, I see the good logs all have an additional line in ES image

When I check airflow 2.9.1 source code, this is the line that airflow use to tell where is the end of a task's log. All the dags are using same airflow config running in same airflow pod. The dag can all run successfully, only have log issues.

I also try to use monkey patch to print more log. Monkey patch code is below:

def new_load_modules_from_file(self, filepath, safe_mode):
    from airflow.models.dag import DagContext
    log.info(f'new_load_modules_from_file filepath: {str(filepath)}')
    log.info(f'new_load_modules_from_file safe_mode: {safe_mode}')
    log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 1')
    if not might_contain_dag(filepath, safe_mode):
        # Don't want to spam user with skip messages
        if not self.has_logged:
            self.has_logged = True
            self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
        return []

    self.log.debug("Importing %s", filepath)
    mod_name = get_unique_dag_module_name(filepath)
    log.info(f'new_load_modules_from_file mod_name: {str(mod_name)}')
    if mod_name in sys.modules:
        log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 2')
        del sys.modules[mod_name]
    log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 3')
    DagContext.current_autoregister_module_name = mod_name
    log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 4')
    def parse(mod_name, filepath):
        log.info('Enter new_load_modules_from_file parse')
        log.info(f'new_load_modules_from_file parse mod_name: {str(mod_name)}')
        log.info(f'new_load_modules_from_file parse filepath: {str(filepath)}')
        try:
            loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
            spec = importlib.util.spec_from_loader(mod_name, loader)
            new_module = importlib.util.module_from_spec(spec)
            log.info(f'new_load_modules_from_file parse loader: {str(loader)}')
            log.info(f'new_load_modules_from_file parse spec: {str(spec)}')
            log.info(f'new_load_modules_from_file parse new_module: {str(new_module)}')
            sys.modules[spec.name] = new_module
            log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 5')
            loader.exec_module(new_module)
            log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 6')
            return [new_module]
        except Exception as e:
            log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 7')
            DagContext.autoregistered_dags.clear()
            self.log.exception("Failed to import: %s", filepath)
            if self.dagbag_import_error_tracebacks:
                self.import_errors[filepath] = traceback.format_exc(
                    limit=-self.dagbag_import_error_traceback_depth
                )
            else:
                self.import_errors[filepath] = str(e)
            return []

    log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 8')
    dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
    log.info(f'new_load_modules_from_file dagbag_import_timeout: {str(dagbag_import_timeout)}')
    if not isinstance(dagbag_import_timeout, (int, float)):
        log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 9')
        raise TypeError(
            f"Value ({dagbag_import_timeout}) from get_dagbag_import_timeout must be int or float"
        )
    log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 10')
    if dagbag_import_timeout <= 0:  # no parsing timeout
        log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 11')
        return parse(mod_name, filepath)

    timeout_msg = (
        f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n"
        "Please take a look at these docs to improve your DAG import time:\n"
        f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
        f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
    )
    log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 12')
    with timeout(dagbag_import_timeout, error_message=timeout_msg):
        log.info('CCCCCCCCCCCCCCCCCCCCCCCCCCC 13')
        return parse(mod_name, filepath)

airflow.models.dagbag.DagBag._load_modules_from_file = new_load_modules_from_file

Doing further code debug, I see the normal dags have this line in airflow-worker log image The dags with log issues ended here image Which means the dags with log issues somehow ended on this line image

Could someone assist here? Really need to know what caused this log issue.

What you think should happen instead?

Logs for all dags should display normally

How to reproduce

Use airflow 2.9.1, try to run dags

Normal dag

import logging
from datetime import datetime, timedelta

from airflow.configuration import conf
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

LOGGER = logging.getLogger("sentinel")
DAG_ID = 'sentinel'
airflow_dag_owner = 'xxx'
base_log_folder = conf.get("logging", "BASE_LOG_FOLDER").rstrip("/")

def time_output_method():
    """Print log to tell dag run time."""
    now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    LOGGER.info('Current time is: %s', now)

DEFAULT_ARGS = {
    'owner': airflow_dag_owner,
    'start_date': datetime(2023, 10, 13),
    "retries": 0,
    "priority_weight": 10000
}

with DAG(
        dag_id=DAG_ID,
        default_args=DEFAULT_ARGS,
        schedule_interval=timedelta(seconds=15),
        tags=['sentinel'],
        catchup=False
) as dag:
    time_output = PythonOperator(
        task_id='time_output',
        python_callable=time_output_method,
        dag=dag,
        do_xcom_push=True
    )
    log_cleanup = BashOperator(
        task_id='log_cleanup',
        bash_command="cleanup_sentinel.sh",
        params={
            "log_file_directory": str(base_log_folder)
        },
        dag=dag)

    time_output >> log_cleanup

Logging issue dag:

import base64
import json
import logging
import random
import re
import ssl
import traceback
from datetime import datetime, timezone

import redis
import requests
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.operators.python import (
    BranchPythonOperator,
    PythonOperator,
    ShortCircuitOperator,
    get_current_context,
)
from botocore.exceptions import BotoCoreError, ClientError  # type: ignore
from common.constants import (  # type: ignore  # type: ignore
    XXX,  # type: ignore
    XXX,  # type: ignore
    SSL_CACERT,
    SSL_CERT_FILE,
    SSL_KEY_FILE,
    XXX,
    XXX,
)
from marshmallow import EXCLUDE
from requests import HTTPError
from requests.exceptions import ReadTimeout
from transition.model import (  # type: ignore  # type: ignore  # type: ignore  # type: ignore  # type: ignore
    AcquireLocksFailure,
    BulkUpdateException,
    IllegalArgumentException,
    LifecycleStatus,
    XXX,
    RetryException,
    TransitionException,
    TransitionsCompleteSchema,
    TransitionsRequestSchema,
    TransitionStatus,
    UnknownException,
    ValidationFailure,
    ValidationRequestSchema,
)

DAG_ID = "transition_execute"
LOCK_KEY = "ipp::ci"
LOGGER = logging.getLogger("transitions")

FAILURE_MESSAGE_TITLE = "xxx"
FAILURE_UPDATE_QUEUE_ADF_MESSAGE_TITLE = "xxx"
COMMON_ERROR_MESSAGE = "xxx"
COMMON_ERROR_REASON = "xxx"

def handle_global_failure(context):
    return

def acquire_locks(**kwargs):
    return

def validate(**kwargs):
    return

def has_failure(**kwargs):
    invalidate = []
    unlocked = [1,2,3]
    has_invalidate = invalidate is not None and len(invalidate) > 0
    has_unlocked = unlocked is not None and len(unlocked) > 0
    return has_invalidate or has_unlocked

def has_abort_or_error(**kwargs):
    failed_requests = []
    return failed_requests is not None and len(failed_requests) > 0

def abort_or_error(**kwargs):
    return ["bulk_update_aborted"]

def abort(task_instance):
    return

def error(task_instance):
    return

def complete_status_change(**kwargs):
    return

def release_locks(**kwargs):
    return

def to_failure_queue(**kwargs):
    return

def to_retry_queue(**kwargs):
    return 

DEFAULT_ARGS = {
    "owner": Variable.get("airflow_dag_owner"),
    "start_date": datetime(2021, 1, 2),
    "retries": 0,
    "on_failure_callback": handle_global_failure,
}

with DAG(
        dag_id=DAG_ID,
        default_args=DEFAULT_ARGS,
        schedule_interval=None,
        tags=["transitions"],
) as dag:
    task_acquire_locks = PythonOperator(
        task_id="acquire_locks",
        python_callable=acquire_locks,
        dag=dag,
        do_xcom_push=True,
        retries=0,
    )

    task_validate = PythonOperator(
        task_id="validate",
        python_callable=validate,
        dag=dag,
        do_xcom_push=True,
        retries=0,
        trigger_rule="one_success",
    )

    task_has_failure = ShortCircuitOperator(
        task_id="has_failure",
        python_callable=has_failure,
        trigger_rule="all_success",
    )

    task_has_abort_or_error = ShortCircuitOperator(
        task_id="has_abort_or_error",
        python_callable=has_abort_or_error,
    )

    task_abort_or_error = BranchPythonOperator(
        task_id="abort_or_error",
        python_callable=abort_or_error,
    )

    task_bulk_abort = PythonOperator(
        task_id="bulk_update_aborted",
        python_callable=abort,
        dag=dag,
        retries=0,
    )

    task_bulk_error = PythonOperator(
        task_id="bulk_update_error",
        python_callable=error,
        dag=dag,
        retries=0,
    )

    task_complete_status_change = PythonOperator(
        task_id="task_complete_status_change",
        python_callable=complete_status_change,
        dag=dag,
        do_xcom_push=True,
        trigger_rule="one_success",
    )

    task_release_locks = PythonOperator(
        task_id="release_locks",
        python_callable=release_locks,
        dag=dag,
        do_xcom_push=False,
        trigger_rule="all_done",
    )

    task_to_failure_queue = PythonOperator(
        task_id="to_failure_queue",
        python_callable=to_failure_queue,
        dag=dag,
        do_xcom_push=False,
    )

    task_to_retry_queue = PythonOperator(
        task_id="to_retry_queue",
        python_callable=to_retry_queue,
        dag=dag,
        do_xcom_push=False,
    )

    # pylint: disable=pointless-statement
    task_acquire_locks >> [task_validate, task_has_failure]
    task_validate >> [task_complete_status_change, task_has_failure]
    task_has_failure >> task_to_failure_queue
    task_complete_status_change >> [
        task_has_failure, task_release_locks, task_has_abort_or_error]
    task_has_abort_or_error >> task_abort_or_error >> [
        task_bulk_abort, task_bulk_error, task_to_retry_queue]

Try to run these 2 dags on airflow2.9.1

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

potiuk commented 3 months ago

Can you upgrade to Airflow 2.9.2 and see if you have the same problem ? Look at the changelog - there were a related issue solved there. Not sure if the same but the easiest way to check it is if you actually upgrade (which you should do anyway).

captainj1993 commented 3 months ago

Can you upgrade to Airflow 2.9.2 and see if you have the same problem ? Look at the changelog - there were a related issue solved there. Not sure if the same but the easiest way to check it is if you actually upgrade (which you should do anyway).

Thank you for your reply. I tried airflow 2.9.2 but still not working image

captainj1993 commented 3 months ago

Update: I have figured out how to resolve this, but the root cause is still unknown You may noticed I have this code in dag:

from common.constants import (  # type: ignore  # type: ignore
    XXX,  # type: ignore
    XXX,  # type: ignore
    SSL_CACERT,
    SSL_CERT_FILE,
    SSL_KEY_FILE,
    XXX,
    XXX,
)

This is causing the log issue. If I remove from common.constants import part or just use this instead:

XXX = "123"
XXX = "456"
SSL_CACERT = "somecacert"
SSL_CERT_FILE = "somecertfile"
SSL_KEY_FILE = "somekeyfile"
XXX = "789"
XXX = "000"

The log is good. Looks like using a constants file is causing airflow trouble when initiating a dag. I don't understand, constants file is a common way to use in python, how can it bring so much trouble?

captainj1993 commented 3 months ago

Update: Moving from common.constants import XXX from top-level code into task will also solve this issue. Per airflow best practice, importing expensive libs in top-level code will cause dag initialization issue. The constants file we use has only 43 lines, file size is 3kb. Is this expensive?

ephraimbuddy commented 2 months ago

Do you have imports in the common.constants?

captainj1993 commented 2 months ago

Do you have imports in the common.constants?

No, its just a constant file, look like this:

XXX = "123"
XXX = "456"
SSL_CACERT = "somecacert"
SSL_CERT_FILE = "somecertfile"
SSL_KEY_FILE = "somekeyfile"
XXX = "789"
XXX = "000"
potiuk commented 2 months ago

Can you look and 'modules management' (search for it in our docs) and follow the best practices there ?

The - most likely - problem (and explained in the best practices) is that you are using a 'common' name as top package import. This is a Python 'property' that when importing a module, Python will import the first found module on the PYTHONPATH and since there are various places it can come from, likely reason is that you have 'common' somewhere there that is different than what you want to import.

Simply when importing something don't start the import with any name that is likely to be used somewhere else.

Our recommendation is to put all packages of yours in uniquely named package (say your organisation name - say 'myorg'. And always import your code as 'feom myorg.something' - this effectively namespaces your import and avoids any kinds of conflicts with similar names.

captainj1993 commented 2 months ago

Can you look and 'modules management' (search for it in our docs) and follow the best practices there ?

The - most likely - problem (and explained in the best practices) is that you are using a 'common' name as top package import. This is a Python 'property' that when importing a module, Python will import the first found module on the PYTHONPATH and since there are various places it can come from, likely reason is that you have 'common' somewhere there that is different than what you want to import.

Simply when importing something don't start the import with any name that is likely to be used somewhere else.

Our recommendation is to put all packages of yours in uniquely named package (say your organisation name - say 'myorg'. And always import your code as 'feom myorg.something' - this effectively namespaces your import and avoids any kinds of conflicts with similar names.

I will definitely do that, thank you!