apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.28k stars 14.34k forks source link

Downstreaming to a task_group creates a dependency to its returned task, not its first one #40196

Open le-chartreux opened 5 months ago

le-chartreux commented 5 months ago

Apache Airflow version

2.9.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Hello Airflow team,

I found something I consider to be a bug, or at least an unexpected behavior.

When I try to set a task_group my_task_group as the downstream of a task start_task:

Examples:

Without return value

In this case, start is linked to task_1_of_group (expected behavior).

Screenshot

downstream to first task of group without return value

Code

from airflow.decorators import dag, task_group
from airflow.operators.empty import EmptyOperator
from pendulum import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def downstream_to_first_task_of_group_without_return_value() -> None:
    """Downstream to the first task of a taskflow group that doesn't return a value."""
    start_task = EmptyOperator(task_id='start')
    end_task = EmptyOperator(task_id='end')
    start_task >> my_task_group() >> end_task

@task_group
def my_task_group() -> None:
    t1 = EmptyOperator(task_id='task_1_of_group')
    t2 = EmptyOperator(task_id='task_2_of_group')
    t1 >> t2
    # no return

downstream_to_first_task_of_group_without_return_value()

With return value

In this case, start is linked to task_2_of_group (because task_2_of_group is returned by the task).

Screenshot

downstream to first task of group with return value not working

Code

from airflow.decorators import dag, task_group
from airflow.operators.empty import EmptyOperator
from pendulum import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def downstream_to_first_task_of_group_with_return_value_not_working() -> None:
    """Try to downstream to the first task of a taskflow group that returns a value.

    Downstreaming to the first task of a group when the group returns a value does not
    work as expected when using the '>>' operator.
    Indeed, a group that returns a value will return the task that produces the value, 
    so trying to downstream a task to the group will link it to the returned task of 
    the group.
    This dag shows this issue.
    """
    start_task = EmptyOperator(task_id='start')
    end_task = EmptyOperator(task_id='end')
    my_task_group_result = my_task_group()
    start_task >> my_task_group_result >> end_task

@task_group
def my_task_group():
    t1 = EmptyOperator(task_id='task_1_of_group')
    t2 = EmptyOperator(task_id='task_2_of_group')
    t1 >> t2
    return t2

downstream_to_first_task_of_group_with_return_value_not_working()

Workaround

A workaround I found is to use an EmptyOperator as an entrypoint. The procedure is to set it as downstream for the task before the group, then to give it to the group as a parameter and to set it as upstream task for the first task of the group.

Screenshot

downstream to first task of group with return value

Code

from airflow.decorators import dag, task_group
from airflow.operators.empty import EmptyOperator
from pendulum import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def downstream_to_first_task_of_group_with_return_value() -> None:
    """Downstream to the first task of a taskflow group that returns a value.

    Downstreaming to the first task of a group when the group returns a value does not
    work as expected when using the '>>' operator.
    Indeed, a group that returns a value will return the task that produces the value, 
    so trying to downstream a task to the group will link it to the returned task of 
    the group.
    To work around the problem, it is possible to use an EmptyOperator as entrypoint.
    """
    start_task = EmptyOperator(task_id='start')
    end_task = EmptyOperator(task_id='end')
    entrypoint_my_task_group = EmptyOperator(task_id="entrypoint_my_task_group")
    my_task_group_result = my_task_group(entrypoint_my_task_group)
    start_task >> entrypoint_my_task_group
    my_task_group_result >> end_task

@task_group
def my_task_group(entrypoint_my_task_group):
    t1 = EmptyOperator(task_id='task_1_of_group')
    t2 = EmptyOperator(task_id='task_2_of_group')
    entrypoint_my_task_group >> t1 >> t2
    return t2

downstream_to_first_task_of_group_with_return_value()

Thank you for your work, Airflow is awesome! Best regards,

Nathan Rousseau, A.K.A le-chartreux

What you think should happen instead?

The behavior of setting the downstream to a task_group should not change no matter this task_group returns something or not. As a user, I expect it to always be set to the first task of the task_group.

How to reproduce

Copy/paste the codes in the 'What happened?' part of this issue (especially the one with a return value since it's the one with an unexpected behavior).

Operating System

Red Hat Enterprise Linux 8.9 (Ootpa)

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.13.0 apache-airflow-providers-fab==1.1.0 apache-airflow-providers-ftp==3.9.0 apache-airflow-providers-http==4.11.0 apache-airflow-providers-imap==3.6.0 apache-airflow-providers-smtp==1.7.0 apache-airflow-providers-sqlite==3.8.0

Deployment

Virtualenv installation

Deployment details

I just used the standard pip install inside a venv.

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 4 months ago

I guess - this is a side-effect of some of the python metaprogramming that we use for dependencies -- as far as I understand, task group should not return anything (or at least it's not specified) - but I might be wrong about it. @uranusjr -> I guess you would be the best person to comment on it maybe?

le-chartreux commented 4 months ago

Hello @potiuk , thank you for your answer.

Indeed, the part of the official Airflow documentation regarding task groups doesn't specify whether it's possible to return a value from a task group.

However, the unofficial documentations from Astronomier (that for what I understand is close to Airflow development) tell that it's possible:

If downstream tasks require the output of tasks that are in the task group decorator, then the task group function must return a result. -- Airflow task groups | Astronomer Documentation

Best regards, Nathan

potiuk commented 4 months ago

Sounds like good candidate to fix and document better - PRs are welcome, but it would be great to hear from those who implemented it what's the intention here :D.

fredthomsen commented 4 months ago

Glad I found this issue as I was experiencing the same with the following:

from typing import Any

from airflow.decorators import dag, task, task_group

@dag()
def test_dag() -> None:
    @task
    def init_data() -> dict[str, Any]:
        return {"some": "data"}

    @task
    def something_important(data: dict[str, Any]) -> None: ...

    @task_group
    def transform_data(data: dict[str, Any]) -> dict[str, Any]:

        @task
        def must_read_data_before_touch(data: dict[str, Any]) -> dict[str, Any]:
            _ = data["some"]
            return data

        @task
        def touch_data(data: dict[str, Any]) -> dict[str, Any]:
            data["new"] = "info"
            return data

        return touch_data(must_read_data_before_touch(data))

    data = init_data()
    no_data = something_important(data)
    new_data = transform_data(data)

    no_data >> new_data

test_dag()

Screenshot 2024-07-06 at 23 20 47

Now I had trouble finding this behavior explained in documentation as well, and I have definitely made heavy use of task_groups returning values, but looking now I can see that _TaskGroupFactory.__call__ does explain this behavior in it's docstring and some inline comments clarify things further in _TaskGroupFactory._create_task_group. The behavior makes sense given these comments and yet doesn't feel quite right given this issue was raised in the first place.

potiuk commented 4 months ago

Now I had trouble finding this behavior explained in documentation as well, and I have definitely made heavy use of task_groups returning values, but looking now I can see that _TaskGroupFactory.call does explain this behavior in it's docstring and some inline comments clarify things further in _TaskGroupFactory._create_task_group. The behavior makes sense given these comments and yet doesn't feel quite right given this issue was raised in the first place.

Ahhh. I see exactly what happened. I looked a bit at the history of it and did a little investigation and here is what happend (@uranusjr -> would love to get you confirm my understanding and see if you agree with my assesment of what should be done here):

tg >> next_task

However that does not work well for this case:

prev_task >> tg 

Precisely, because of the case you describe - that the previous task would be upstream of the last task in a group.

(and @uranusjr correctly mentioned in https://github.com/apache/airflow/issues/19903#issuecomment-1005849107 it could be done by returning [ task_start, task_end] - but it was completely non-obvious.

Then the task group-decorated function return value have been improved by @uranusjr here https://github.com/apache/airflow/pull/20671 -> where it was allowed that task group-decorated function returns nothing which is equivalent of returning the task_group - which works for both sides of the dependencies (>> tg will add downstream to first task in the group where tg >> will add upstream dependency from the last task in the group)

It was supposed to be documented in https://github.com/apache/airflow/pull/20671 which was created as a follow-up task - but it has been closed as "completed" by https://github.com/apache/airflow/pull/26028 - but in fact that PR only adds somme examples and does not really describe this behaviour in the docs (and it does not describe the differences depending on what is returned by the decorated function).

I honestly find it hard to justify the the behaviour when task is returned - it only makes sense for tg >> next_task case, but if we just return none (or tg directly), it will behave exactly the same for this case (and it will not create a confusion because it will also correctly (intuitively) work as intuitively expected also in prev_task >> tg.

So my proposal is that we should issue a deprecation warning when task is returned from @task_group decorated function and possibly fail if task is returned in such method in Airflow 3 - because I think returning single task from such decorated method makes very little sense and creates confusion (as proven by this Issue).

Then it should of course be documented (but I I would rather see it documented as deprecated behaviour if others agree with me). @uranusjr (and also @eladkal - who had already fixed our example dag in the past that was exhibiting that confusing behaviour - https://github.com/apache/airflow/pull/21240 - what do you think ?

le-chartreux commented 4 months ago

Hi,

Thank you all for your comments and investigations!

So @potiuk , you're saying that task_group should not return a task? In this case, how are you supposed to get the result of a task_group? By using Xcoms and stop taking advantage of the taskflow API?

Best regards, Nathan

potiuk commented 4 months ago

So @potiuk , you're saying that task_group should not return a task?

No. What I am saying is that task_group should not return anything (i.e. None) and IMHO returning task should be deprecated and raise warning (@uranusjr -> would love to hear what you think and if I understood the whole problem properly).

We already have code implemented in #20671 by @uranusjr that will act as-if the function returned the task_group itself - in case @task_group-decorated group returns None. And this is is "reasonable" behaviour IMHO. - because it will properly model dependencies (both when something depends on tg and when tg depends on something else).

Returning a single task from task_group decorated method IMHO is just confusing (as you yourself noticed) - because if you return a single task, then it is impossible to serve both sides of the dependencies. you can only handle a >> tg (if you return first tasl of a group) or tg >> b (if you return last task of the group) but not both. This is because these two dependencies require different tasks belonging to the same group to be used as dependency.

Of course if you return [ first_task, last_task] from task_group this will also work, and that should still be allowed - but this is really uninituitive and unexpected, so this should be treated as an obscure, while valid, approach that allows some more sophisticated cases, including [task, task] if you really want to do exactly what currently returning task does and avoid the warning.

Does it make sense?

le-chartreux commented 4 months ago

Hi,

Yes, it makes sense, thank you @potiuk!

But what if I need to get the result of the final task of a task-group? E.g., the following code branches to a task-group (or a task) then get the result of the task-group (or the task) that ran. But choose_a_or_b connects to operation_on_value only, as seen on the picture below. Should I use return (value, operation_on_value(value)), even if you said it's “really unintuitive and unexpected”?

from airflow.decorators import dag, task, task_group
from airflow.models.baseoperator import BaseOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from pendulum import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False,)
def branching_to_first_task_of_group_with_return_value() -> None:
    """Do a branching to a taskflow group and get its result."""
    choice_a_result = choice_a()
    choice_b_result = choice_b()
    choose_a_or_b() >> [choice_a_result, choice_b_result]

    get_result(choice_a_result, choice_b_result)

@task.branch
def choose_a_or_b() -> str:
    return "choice_a"
    # return "choice_b"

@task_group()
def choice_a() -> int:
    value = get_a_value()
    return operation_on_value(value)
    # Should I use `return (value, operation_on_value(value))` ?

@task
def get_a_value() -> int:
    return 1

@task
def operation_on_value(value: int) -> int:
    return value * 6

@task
def choice_b() -> int:
    return 2

@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def get_result(result_a: int | None, result_b: int | None) -> int:
    return result_a or result_b

branching_to_first_task_of_group_with_return_value()

Result

Best regards, Nathan

uranusjr commented 4 months ago

I don’t think this has anything to do with dynamic task mapping specifically? This is just the design decision Airflow made when task groups were introduced. Not sure if we can change it either, depending on the last task is a pretty intuitive choice.

le-chartreux commented 4 months ago

Hi @uranusjr, thank you for your comment!

Indeed, for what I understand it's not about dynamic task mapping (expand and things like that) but task-groups.

In my opinion, it's intuitive when wanting to downstream from the task-group but not when wanting to downstream to the task-group (e.g., with branching like here). How am I supposed to downstream to a task-group on which I want to be able to get the result of the last task then?

Best regards, Nathan

uranusjr commented 3 months ago

This is because you return the task from the group function. WHen you do that, dependencies to the group is connected to the task you return. If you want to connect to the group itself instead, simply don’t return the task:

@task_group()
def choice_a() -> int:
    value = get_a_value()
    operation_on_value(value)

However, I do observe another bug. When you do this, we don’t connect the task to the group’s downstream:

con

cc @bbovenzi Does the server need to provide extra information to make this happen?

le-chartreux commented 3 months ago

Hi @uranusjr, thank you again for your comment!

In that case, how am I supposed to get the result of a task that is inside a taskgroup outside this taskgroup?

E.g., in the following example, allow get_result to obtain the result of operation_on_value? And as you said, set operation_on_value as an upstream of get_result. this example

Best regards, Nathan

techolga commented 3 months ago

Hi, I was just going to open a similar issue when I found this one. In my opinion it is not very intuitive when the execution behavior changes depending on the existence of a return statement. As a user I would expect that return statement has no influence on the execution order of the upstream tasks.

Another case I came across is the following:

Here I can not make that my files preparation is complete before the execution starts. image

If I would daisy chain the file_list through the last task of the task_group and return that, then I can not have upstream dependencies anymore. image

Not returning the file_list from the task_group is not a good option because in my case the task_group is a function of a common package that is used in multiple nested task_groups and its hard to find the right ID to access it through xcom.pull.

Thanks @potiuk for the idea of returning multiple tasks, thats more convenient than my previous workaround.

I am not sure about the implications, but for me something like a depend_on_upstream argument added to the task_group would make it possible to solve all issues. This could then implicitly return the first task as well as whatever task is explicitly returned. Or being ignored if no task is explicitly returned.

TobyMellor commented 2 months ago

+1, depend_on_upstream would also solve my use case. Also facing the same issue: it's only possible to create a dependency on the returned task, rather than the entire task group (and all tasks inside it)

pedro-cf commented 1 month ago

Experienced this issue too... I had 3 branching tasks that merged into 1 "merging" task and I needed to add additiontal logic to 1 of the branching tasks, so I decided to turn it into a @task_group assuming I could return a value like before, but struggled. Ended up using something like this:

from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
import random

@dag(
    dag_id='branched_dag_with_task_group',
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    is_paused_upon_creation=False,
)
def branched_dag_with_task_group():

    @task
    def start():
        print("Starting the DAG")
        return ["path1", "path2", "path3"]

    @task.branch
    def branch(paths):
        return random.choice(paths)

    @task
    def path1():
        print("Executing Path 1")
        return "Result from Path 1"

    @task
    def path2():
        print("Executing Path 3")
        return "Result from Path 3"

    @task_group
    def path3():
        @task
        def path3_task1():
            print("Executing Path 2 - Task 1")
            return random.choice([1,2])

        @task.skip_if(condition=lambda context: context['ti'].xcom_pull(task_ids='path3.path3_task1') == 1)
        @task
        def path3_task2(t1):
            print(f"Executing Path 2 - Task 2, received: {t1}")
            return "Result from Path 2"

        @task(trigger_rule=TriggerRule.NONE_FAILED)
        def path3_task3(t1, t2):
            print(f"{t1=}")
            print(f"{t2=}")
            return 42

        t1 = path3_task1()
        t2 = path3_task2(t1)
        t3 = path3_task3(t1, t2)
        t1 >> t2 >> t3

        return t1, t2, t3

    @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
    def merge(path1_result, path2_result, path3_result):
        print(f"{path1_result=}")
        print(f"{path2_result=}")
        print(f"{path3_result=}")

    start_result = start()
    branched_paths = branch(start_result)

    path1_result = path1()
    path2_result = path2()
    path3_result = path3()

    merged_result = merge(path1_result, path2_result, path3_result[-1])

    start_result >> branched_paths
    branched_paths >> path1_result >> merged_result
    branched_paths >> path2_result >> merged_result
    branched_paths >> path3_result >> merged_result

branched_dag_with_task_group()

image