apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.06k stars 14.29k forks source link

Current XCom __getitem__ behavior is confusing in task mapping context #25061

Open michal-cech opened 2 years ago

michal-cech commented 2 years ago

Apache Airflow version

2.3.3 (latest released)

What happened

When tasks return dictionary value, it is possible to access the value on the DAG level by using return_value['key']. This does not seem to be possible when using the newly added expand functionality. It fails to load the DAG with (see example below): ValueError: cannot map over XCom with custom key 'list' from <Task(_PythonDecoratedOperator): params>

What you think should happen instead

I think Airflow should behave in a consistent way while using

task(return_value['something'])

and

task.expand(arg=return_value['something'])

How to reproduce

from airflow.decorators import dag, task
import pendulum

@dag(
    default_args={
        'owner': 'airflow',
        'start_date': pendulum.datetime(2022, 7, 14, tz="Europe/Prague"),
        'depends_on_past': False,
        'retries': 0,

    },
    schedule_interval="* * * * *",
    max_active_runs=1,
)
def test():
    @task(multiple_outputs=True)
    def params():
        return {"a": 1, "b": 1, "list": [1, 2, 3]}

    parameters = params()

    @task
    def print_list(list):
        return list

    @task
    def print_elements(element):
        return element

    #this works
    print_list(parameters['list'])
    #this also works
    print_elements.expand(element=[1,2,3])
    #this does not
    print_elements.expand(element=parameters['list'])

test_dag = test()

Operating System

Ubuntu 20.0.4

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 2 years ago

Thanks for opening your first issue here! Be sure to follow the issue template!

michal-cech commented 2 years ago

To add, I understand that I can use another task like this:

@task
def extract_input(p):
    return p['list']

p_list = extract_input(parameters)

print_elements.expand(element=p_list)

and it will work, but I wonder if the described behaviour is intended or if it is a bug / true inconsistency in behaviour.

josh-fell commented 2 years ago

FWIW, this was a conscious decision when implementing AIP-42 at the time. See https://github.com/apache/airflow/pull/21930 description.

cc @uranusjr if there is any additional color to add.

uranusjr commented 2 years ago

There are actually two levels in this issue. First, the square bracket syntax does not mean dictionary access here. It means accessing a different XCom pushed by the parent task.

The second level is we do not support expanding a task against arbitrary XCom push, only the return value, as mentioned above. This is the direct cause to the exception, but not related to what the user is actually trying to do here.

The current square bracket syntax is terrible (sorry if you were the one who invented it), but unfortunately we can’t do anything about it at the time. I’ll re-label this as a documentation issue so we can add some warnings in the task-mapping doc page. Feel free to work on it.