apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Dynamic task mapping over task_group expands over length of return_value dictionary instead of targeted XCom value #35355

Closed asherkhb-ktx closed 1 year ago

asherkhb-ktx commented 1 year ago

Apache Airflow version

2.7.2

What happened

When attempting to use dynamic task mapping over a task_group() based on a non-standard XCom (e.g. NOT return_value), the group expands to n=len(return_value) instances instead of n=len(specified_key).

Dynamic instances which correspond to a valid index in the specified_key succeed, but...

What you think should happen instead

The task_group should expand over the length of the specified XCom value

How to reproduce

Consider the following DAG; you would expect greet_by_name to run three times but it instead only runs twice.

from airflow import DAG
from airflow.decorators import task, task_group
import pendulum

with DAG(
    'Test_Dynamic',
    start_date=pendulum.datetime(2022, 1, 1, tz='UTC'),
    schedule_interval=None,
    catchup=False,
) as dag:

    @task(multiple_outputs=True)
    def get_greetings():
        return {
            'salutation': 'Hello',
            'names': ['John', 'Jane', 'Jack'],
        }

    greetings = get_greetings()

    @task_group()
    def greet_by_name(name, salutation):

        @task()
        def format_greeting(name, salutation):
            return f'{salutation}, {name}'

        @task()
        def print_message(msg):
            print(msg)

        print_message(format_greeting(name, salutation))

    greet_everyone = greet_by_name.partial(
        salutation=greetings['salutation']
    ).expand(
        name=greetings['names']
    )

If additional outputs are added to the get_greetings task then the task_group will expand accordingly (e.g. to 4 tasks) but some will fail:

@task(multiple_outputs=True)
    def get_greetings():
        return {
            'salutation': 'Hello',
            'names': ['John', 'Jane', 'Jack'],
            'extra_key_1': 'extra_value_1',
            'extra_key_2': 'extra_value_2',
        }

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jscheffl commented 1 year ago

I fear this is a mis-understanding. I have never seen it working - and would be surprised if it is - that you can map on additional XCom results like you went down with names.

What you should consider trying to to use a mapping function to transform the XCom results for the data you are looking for as described in https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#transforming-expanding-data

potiuk commented 1 year ago

Agree with Jens. I convert it into discussion in case more is needed, I do not see it as an issue. CC: @uranusjr for more insight- maybe you have some comments there.