apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.38k stars 14.1k forks source link

Airflow CLI command "dags list" command returning unexpected output #37228

Closed Mrhacktivist closed 7 months ago

Mrhacktivist commented 7 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.5.1

What happened?

Airflow CLI command "dags list" command returning unexpected output containing "Sample operation result: <Task(DatabricksSqlOperator): select_data> Sample operation type: <class 'airflow.providers.databricks.operators.databricks_sql.DatabricksSqlOperator'>" in its response

What you think should happen instead?

No response

How to reproduce

It's getting returned in specific environment only. Tried in other environments with same version and there is no such output

Operating System

amazon linux 2

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 7 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jscheffl commented 7 months ago

Can you please post some more details about the unexpected details? Can you please post the full CLI you used? Also if possible the full output would be good. Redacted as well. Can you please tell us whether the reported operators are used in your DAGs (assuming yes)? Can you check whether the same is happening on the most recent version of Airflow?

Mrhacktivist commented 7 months ago

CLI USED -

import boto3
import json
import requests 
import base64

mwaa_env_name = 'MyAirflowEnvironment5'

mwaa_cli_command = 'dags list -o json'

client = boto3.client('mwaa')

mwaa_cli_token = client.create_cli_token(
    Name=mwaa_env_name
)

mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
raw_data = '{0}'.format(mwaa_cli_command)

mwaa_response = requests.post(
        mwaa_webserver_hostname,
        headers={
            'Authorization': mwaa_auth_token,
            'Content-Type': 'text/plain'
            },
        data=raw_data
        )

mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')

print(mwaa_response.status_code)
print(mwaa_std_err_message)
print(mwaa_std_out_message)

Output -

Sample operation result: <Task(DatabricksSqlOperator): select_data>
Sample operation type: <class 'airflow.providers.databricks.operators.databricks_sql.DatabricksSqlOperator'>
[{"dag_id": "pl_CustomerPortfolios_Load", "filepath": "pl_CustomerPortfolios_Load.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_api", "filepath": "pl_api.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_combined_ingestion", "filepath": "pl_combined_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_copy_source_file_from_ingress", "filepath": "pl_copy_source_file_from_ingress.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_database_cdc_full_ingestion", "filepath": "pl_database_cdc_full_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_database_cdc_ingestion", "filepath": "pl_database_cdc_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_database_full_ingestion", "filepath": "pl_database_full_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_database_ingestion", "filepath": "pl_database_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_database_ingestion_hotelkey", "filepath": "pl_database_ingestion_hotelkey.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_database_ingestion_onprem", "filepath": "pl_database_ingestion_onprem.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_email_notification", "filepath": "pl_email_notification.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_file_ingestion", "filepath": "pl_file_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_initiate_ingestion", "filepath": "pl_initiate_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_int021_Load", "filepath": "pl_int021_Load.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_invoke_databricks_workflow", "filepath": "pl_invoke_databricks_workflow.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_oci_ingestion", "filepath": "pl_oci_ingestion.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_onyx_to_egress", "filepath": "pl_onyx_to_egress.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_prep_layer_only", "filepath": "pl_prep_layer_only.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_prep_only", "filepath": "pl_prep_only.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_pyspark_test", "filepath": "pl_pyspark_test.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_raw_ingestion_log", "filepath": "pl_raw_ingestion_log.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_snowflake", "filepath": "pl_snowflake.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_source_table_count_debug", "filepath": "pl_source_table_count_debug.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_tableau_trigger", "filepath": "pl_tableau_trigger.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_test_airflow_s3_connections", "filepath": "pl_test_airflow_s3_connections.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_test_db_connectivity", "filepath": "pl_test_db_connectivity.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_test_exception", "filepath": "pl_test_exception.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_test_ingress_connection", "filepath": "pl_test_ingress_connection.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_test_new_ingress_connection", "filepath": "pl_test_new_ingress_connection.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_threshold_trigger", "filepath": "pl_threshold_trigger.py", "owner": "airflow", "paused": "False"}, {"dag_id": "pl_zip_files", "filepath": "pl_zip_files.py", "owner": "airflow", "paused": "False"}]

Yes the respective operator is used in DAG

potiuk commented 7 months ago

Well, those messages come from your DAG or configuraiton (local settings) while the dags are being parsed. Just don't print output to stdout in your DAGs when they are parsed.

Look for the places where your DAGs write those messages. The problem is that parsing DAG executes your code so if you do something in your code to use stderr, this creates the problem.

We currently suppress logs and warnings for all commands that return any json output with decorator below - but it does not suppress stuff that you print manually in your dags using print()

Generally it's a good practice to use logging facilities to print diagnostics info rather than printing to stdout directly.

def suppress_logs_and_warning(f: T) -> T:
    """Suppress logging and warning messages in cli functions."""

    @functools.wraps(f)
    def _wrapper(*args, **kwargs):
        _check_cli_args(args)
        if args[0].verbose:
            f(*args, **kwargs)
        else:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                logging.disable(logging.CRITICAL)
                try:
                    f(*args, **kwargs)
                finally:
                    # logging output again depends on the effective
                    # levels of individual loggers
                    logging.disable(logging.NOTSET)

    return cast(T, _wrapper)
potiuk commented 7 months ago

Closing as invalid.