apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.06k stars 14.29k forks source link

Provide consumer DAGs with context about the dag run(s) of its producer(s) #33088

Open RNHTTR opened 1 year ago

RNHTTR commented 1 year ago

Description

As far as I can tell, there's currently no way for a consumer DAG to access information of the producer DAG(s) that triggered it. It'd be helpful if there was a producers context variable available to task instances in a consumer that had information such as the producer(s)' dag run and task instance (i.e. the task instance in the producer DAG that wrote to the outlet) objects.

Use case/motivation

See the associated discussion.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

scr-oath commented 1 year ago

Yes, I agree - and if there are multiple triggers such as schedule=[Dataset("one"), Dataset("two")] it would be quite generally useful to get information about each triggering event - I'm not sure exactly what "coordinates" exist for all the information about a dag or task "instance" but if a task has an outlet of a Dataset, then the "downstream" / "consumer" DAGs and their tasks should be able to learn about the information about the task instance that triggered them and its XCOM information.

scr-oath commented 1 year ago

It seems like - I could be wrong - task_id could be enough to get the exact xcom information.

So… if a TaskInstance is able to know how it was triggered - and, when scheduled is datasets, have corresponding list or map of dataset to the task_id that had it as an "outlet" that would be a reasonable solution to hook up the guts that could use the existing constructs to just xcom_pull from there as needed.

Pull XComs that optionally meet certain criteria.

        :param key: A key for the XCom. If provided, only XComs with matching
            keys will be returned. The default key is ``'return_value'``, also
            available as constant ``XCOM_RETURN_KEY``. This key is automatically
            given to XComs returned by tasks (as opposed to being pushed
            manually). To remove the filter, pass *None*.
        :param task_ids: Only XComs from tasks with matching ids will be
            pulled. Pass *None* to remove the filter.
        :param dag_id: If provided, only pulls XComs from this DAG. If *None*
            (default), the DAG of the calling task is used.
        :param map_indexes: If provided, only pull XComs with matching indexes.
            If *None* (default), this is inferred from the task(s) being pulled
            (see below for details).
        :param include_prior_dates: If False, only XComs from the current
            execution_date are returned. If *True*, XComs from previous dates
            are returned as well.
scr-oath commented 1 year ago

Just thinking out loud, but if TaskInstance could have a copy of the schedule from its dag and a field (say outlet_task_id added to the Dataset class and populated when a task is triggered by a dataset, that might also be a good place to put it…

class Dataset:
    """A Dataset is used for marking data dependencies between workflows."""

    uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)])
    extra: dict[str, Any] | None = None
    outlet_task_id: Optional[str] = None

    version: ClassVar[int] = 1