apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
35.34k stars 13.8k forks source link

task is incorrectly triggered after dynamic task mapping with BranchPythonOperator #40620

Open raycarter opened 4 days ago

raycarter commented 4 days ago

Apache Airflow version

2.9.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We need to use BranchPythonOperator and dynamic task mapping in a DAG to meet our requirements. The BranchPythonOperator controls whether dynamic mapping is needed, and there is a task (branch_2_task) following the dynamic task mapping that must be executed regardless of whether the dynamic task mapping runs (it can be skipped by receiving an empty list). As shown in the diagram 1, the trigger_rule for branch_2_task needs to be set to none_failed.

diagram 1: image

If it is set to all_success, the task branch_2_task will also be skipped if the dynamic mapping is skipped due to an empty list (diagram 2).

diagram 2: image

However, when BranchPythonOperator triggers the branch_1 task, the task branch_2_task on the other branch also gets executed (diagram 3).

diagram 3: image

The potential bug could be that Airflow does not check if a task is on the executing branch but only checks the trigger_rule.

Could you fix this?

What you think should happen instead?

The task branch_2_task has trigger_rule none_failed to be executed even if the dynamic task mapping before it is skipped. If the other branch is executed, the task branch_2_task must be skipped because the branch should be skipped.

How to reproduce

Operating System

Ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

romsharon98 commented 4 days ago

In the third picture the branch_2_task trigger rule set to none_failed,since all the previous tasks are not failed (skipped or success) it should run.

raycarter commented 4 days ago

In the third picture the branch_2_task trigger rule set to none_failed,since all the previous tasks are not failed (skipped or success) it should run.

the problem is actually, in the 3rd picture the task branch_2_task should not run, because the branch operator decided to trigger branch_1 (the other branch).

romsharon98 commented 4 days ago

I see. I still don't know if it's a bug though because the reason I mention, so we will wait for third opinion. A quick action that I think of to avoid it, in branch_2_task you can retrive the task_id from the branch_operator and decide to run the logic or not by this.

raycarter commented 1 day ago

sure! thank you for the quick first response!