apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.4k stars 14.36k forks source link

Automatically detecting top-level DAG code calls to `Variable.get()` #42205

Open matthewblock opened 2 months ago

matthewblock commented 2 months ago

Description

I'm not sure where in the Airflow project this would be most useful (maybe docs only, or as an internal lint test) but this has been useful for my team. Hopefully this sparks a discussion.

When discussing Airflow best practices, often the first mentioned is avoiding writing code that accesses external services or the Airflow meta database at the top-level of your DAG files:

https://www.astronomer.io/docs/learn/dag-best-practices#avoid-top-level-code-in-your-dag-file https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code

The most frequently cited example is usually calls to Variable.get(). Often due to cascading imports, these calls can be difficult to detect visually. You also need buy in and education from every developer across an org to avoid something that is allowed in Python.

You may also be inheriting a Python project where this best practice was not heeded, littering your code with expensive top-level calls.

What if a simple unit test detected top-level calls to Variable.get()?

def test_top_level_variable_call(mocker):
    # Patch to prevent adding DAG to database
    mocker.patch("airflow.dag_processing.processor.DagBag._sync_to_db")

    # Patch to prevent creating logs
    mock_logger = mocker.patch("airflow.dag_processing.processor.logging.Logger")

    # Patch to prevent Airflow from creating DagWarnings, which would cause FK
    # violations because we aren't adding the DAGs to the database
    mocker.patch(
        "airflow.dag_processing.processor.DagFileProcessor.update_dag_warnings"
    )

    files = list_py_file_paths(BASE_STATIC_PATH)

    all_mock_calls = []

    for file in files:
        mock_get = mocker.patch("airflow.models.variable.Variable.get")

        DagFileProcessor(
            dag_ids=None, log=mock_logger, dag_directory=BASE_STATIC_PATH
        ).process_file(file_path=file, callback_requests=[])

        if mock_get.mock_calls:
            all_mock_calls.extend(mock_get.mock_calls)
            print(file, mock_get.mock_calls)

    assert len(all_mock_calls) == 0

This has helped my team automatically detect (most) calls to Variable.get(), which has improved our scheduler function.

Use case/motivation

Automate detection of most top-level DAG code calls (Variable.get() or other functions recommended by Airflow that access Airflow meta database)

Related issues

No response

Are you willing to submit a PR?

I'm not a contributor (yet) but I'm not sure where this would be most useful.

Code of Conduct

gopidesupavan commented 3 weeks ago

@matthewblock assigned.