apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Dynamic task gets "upstream_failed" when with trigger_rule='none_failed' and upstream skipped. #37091

Closed kmehkeri closed 7 months ago

kmehkeri commented 9 months ago

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When a task that uses dynamic task mapping has trigger_rule='none_failed' and depends on a task that was skipped, it fails with upstream_failed status. image

What you think should happen instead?

The dynamic task should be skipped as well (this happens with trigger_rule='all_success'.

How to reproduce

import random

from airflow.decorators import dag, task
import datetime

@dag(
    start_date=datetime.datetime(2021, 1, 1),
    schedule=None,
)
def branch_with_mapping():
    @task
    def random_fun():
        import random
        return random.randrange(-10, 10) > 0

    @task.branch
    def branching(x):
        if x is True:
            return "true_branch"
        else:
            return "false_branch"

    @task
    def true_branch():
        print("True")
        return [1, 2, 3]

    @task
    def false_branch():
        print("False")

    @task(trigger_rule='none_failed')
    def echo(i):
        print(i)

    t = true_branch()
    branching(random_fun()) >> [t, false_branch()]
    echo.expand(i=t)

branch_with_mapping()

Operating System

Using official Airflow Docker image in standalone mode

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

stevenschaerer commented 9 months ago

I'm happy to take a look.

potiuk commented 9 months ago

Feel free!

stevenschaerer commented 9 months ago

What do you think about adding a new task instance dependency that sets the state to SKIPPED if any of the mapped depedencies are SKIPPED but not FAILED?

class MappedTaskUpstreamDep(BaseTIDep):
    """
    Determines if a mapped task's upstream tasks that provide XComs used by this task for task mapping are in
    a state that allows a given task instance to run.
    """

    NAME = "Mapped dependencies have succeeded"
    IGNORABLE = True
    IS_TASK_DEP = True

    def _get_dep_statuses(self, ti, session, dep_context):
        from airflow.models.mappedoperator import MappedOperator
        if isinstance(ti.task, MappedOperator):
            mapped_dependencies = ti.task.iter_mapped_dependencies()
        elif (task_group := ti.task.get_closest_mapped_task_group()) is not None:
            mapped_dependencies = task_group.iter_mapped_dependencies()
        else:
            return

        mapped_dependency_tis = [
            ti.get_dagrun(session).get_task_instance(operator.task_id, session=session)
            for operator in mapped_dependencies
        ]
        if not mapped_dependency_tis:
            yield self._passing_status(reason="There are no mapped dependencies!")
            return
        finished_tis = [
            ti
            for ti in mapped_dependency_tis
            if ti is not None and ti.state in State.finished
        ]
        if len(mapped_dependency_tis) != len(finished_tis):
            return

        finished_states = {finished_ti.state for finished_ti in finished_tis}
        if finished_states == {TaskInstanceState.SUCCESS}:
            yield self._passing_status(reason="The task's mapped dependencies have all succeeded!")
            return

        if finished_states.issubset({TaskInstanceState.SKIPPED, TaskInstanceState.SUCCESS}):
            new_state = TaskInstanceState.SKIPPED
        else:
            new_state = TaskInstanceState.UPSTREAM_FAILED
        if ti.set_state(new_state, session):
            dep_context.have_changed_ti_states = True
        yield self._failing_status(reason="At least one of task's mapped dependencies has not succeeded!")

No need for a detailed review, I'd just like to know if this is a valid approach before writing more tests and creating a PR.

Alternatively, something like this could be added to expand_mapped_task in AbstractOperator which is where the reported UPSTREAM_FAILED is coming from right now.