apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.44k stars 14.11k forks source link

`TaskInstance.xcom_pull()` arg `include_prior_dates` only returns a single xcom #37977

Closed matterrr closed 6 months ago

matterrr commented 6 months ago

Apache Airflow version

2.8.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The arg include_prior_dates in TaskInstance.xcom_pull() has the following definition:

    :param include_prior_dates: If False, only XComs from the current
        execution_date are returned. If *True*, XComs from previous dates
        are returned as well.

In practice, xcom_pull(key="<some_key>", include_prior_dates=True) only returns a single xcom (the most recent prior DAG run) when only the key is specified. I think it would only return multiple xcoms if multiple task_ids are specified, or the xcom(s) was pushed from a mapped task.

What you think should happen instead?

I think that the include_prior_dates being plural implies that xcom_pull(key="<some_key>", include_prior_dates=True) should return an array of xcoms for all dag_runs. This is even consistent with the wording in the get_many() function called in xcom_pull():

    :param include_prior_dates: If *False* (default), only XComs from the
        specified DAG run are returned. If *True*, all matching XComs are
        returned regardless of the run it belongs to.

But strangely, the get_one() function in BaseXCom has this wording:

    :param include_prior_dates: If *False* (default), only XCom from the
        specified DAG run is returned. If *True*, **the latest matching XCom is
        returned regardless of the run it belongs to**.

At the least, the wording include_prior_dates should be changed to include_prior_date (singular) if xcoms from only the previous DAG run will ever be returned.

How to reproduce

h3. Example Code

import logging
import sys
from datetime import datetime

from airflow.decorators import dag, task

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2021, 1, 1),
    "retries": 0,
}

logger = logging.getLogger(__name__)

@dag(
    dag_id="test.xcom",
    default_args=default_args,
    schedule=None,
)
def test_xcom():
    @task()
    def get_recent_xcom(**context):
        from datetime import datetime

        ti = context["ti"]
        xcom_results = ti.xcom_pull(key="test_xcom", include_prior_dates=True)

        print(f"{xcom_results=}")

        ti.xcom_push(key="test_xcom", value=f"{datetime.now()}")

    get_recent_xcom()

test_xcom()

dag.doc_md = """This test DAG logs the xcom."""

h3. Steps

  1. Run the DAG, noting the xcom value will be None for the first run
  2. Run the DAG subsequent times, noting the xcom_pull() only ever returns a single value from the previous DAG run

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 6 months ago

No it's all good and consistent.

Theinclude_prior_dates is good because it is used to do filtering (before selecting one - latest in this case - is applied). So it's not about returned value but about XCom values that are considered as candidates. It's perfectly fine and consistent.