apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.44k stars 14.11k forks source link

Incorrect notation for datasets changed during runtime. #39863

Closed phi-friday closed 1 month ago

phi-friday commented 4 months ago

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When I modified the dataset during runtime, as shown below,

outlets: list[Dataset] = context["outlets"]  # pyright: ignore[reportTypedDictNotRequiredAccess]
outlets[0].extra = (outlets[0].extra or {}) | {"test_value": 123}

In XCOM, I can see these changed values.

스크린샷 2024-05-27 오전 10 05 28

However, it will look different in detail.

스크린샷 2024-05-27 오전 10 05 38

And if I check the triggered DAG, extra is passed unchanged.

스크린샷 2024-05-27 오전 10 05 50 스크린샷 2024-05-27 오전 10 06 01

What you think should happen instead?

It has to be one(changed) or the other(unchanged).

How to reproduce

from __future__ import annotations

from datetime import UTC, datetime

from airflow.datasets import Dataset
from airflow.decorators import dag, task

dataset = Dataset(uri="dataset_example")

@dag(
    dag_id="append_test",
    start_date=datetime(2023, 11, 1, tzinfo=UTC),
    schedule=None,
    default_args={"do_xcom_push": False},
    catchup=False,
    max_active_runs=1,
    tags=["dataset", "example"],
)
def dataset_trigger_example() -> None:  # noqa: D103
    @task.python(outlets=[dataset])
    def setup_dataset() -> None:
        from airflow.operators.python import get_current_context

        context = get_current_context()

        outlets: list[Dataset] = context["outlets"]  # pyright: ignore[reportTypedDictNotRequiredAccess]
        outlets[0].extra = (outlets[0].extra or {}) | {"test_value": 123}

    _ = setup_dataset()

@dag(
    dag_id="append_test_comsumer",
    start_date=datetime(2023, 11, 1, tzinfo=UTC),
    schedule=[dataset],
    default_args={"do_xcom_push": False},
    catchup=False,
    max_active_runs=1,
    tags=["dataset", "example"],
)
def dataset_consumer_example() -> None:  # noqa: D103
    @task.python()
    def echo_in_python() -> None:
        from pprint import pprint

        from airflow.operators.python import get_current_context

        context = get_current_context()
        events = context.get("triggering_dataset_events", {})
        events = dict(events)

        pprint(events)  # noqa: T203

    _ = echo_in_python()

dataset_trigger_example()
dataset_consumer_example()

Operating System

PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/"

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.7.0 apache-airflow-providers-common-io==1.3.1 apache-airflow-providers-common-sql==1.13.0 apache-airflow-providers-docker==3.11.0 apache-airflow-providers-fab==1.1.0 apache-airflow-providers-ftp==3.9.0 apache-airflow-providers-http==4.11.0 apache-airflow-providers-imap==3.6.0 apache-airflow-providers-odbc==4.6.0 apache-airflow-providers-postgres==5.11.0 apache-airflow-providers-redis==3.7.0 apache-airflow-providers-smtp==1.7.0 apache-airflow-providers-sqlite==3.8.0

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

uranusjr commented 1 month ago

Dataset declarations are supposed to be stable, and the side effect of changing it at runtime is undefined. If you want to attach information dynamically, do it on the dataset event instead.

phi-friday commented 1 month ago

Dataset declarations are supposed to be stable, and the side effect of changing it at runtime is undefined. If you want to attach information dynamically, do it on the dataset event instead.

@uranusjr I know I should use event. I created the issue just to say that it seems odd that the two are inconsistent with each other. Wouldn't it be nice to have the two items synchronized with each other? Or at least throw an error.