apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.27k stars 14.34k forks source link

Wrong task order in Airflow UI Grid #42617

Open BT2 opened 1 month ago

BT2 commented 1 month ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.2

What happened?

I have a problem with the order of tasks after adding the branch operator. After adding it, the tasks on the ‘Grid’ view have an incorrect order, while on the ‘Graph’ they look fine and are run in the correct order. Tasks after branch operator are placed at the bottom

What you think should happen instead?

image

How to reproduce

from airflow.decorators import dag from pendulum import today from airflow.operators.empty import EmptyOperator from airflow.decorators import task from airflow.exceptions import AirflowSkipException from airflow.utils.state import State import random

@dag( dag_id='grid_bug',

For now it's hardcoded, maybe we will use this in a future ?

start_date=today("UTC").add(days=-31),
schedule=None,
is_paused_upon_creation=True,
# For now it's hardcoded, maybe we will use this in a future ?
catchup=False,
render_template_as_native_obj=False

) def dynamic_generated_dag(): first_task = EmptyOperator(task_id='first_task') second_task = EmptyOperator(task_id='second_task') end_task = EmptyOperator(task_id='end_task')

first_task >> second_task
for i in range(1,10):

    # Data ingestion task
    data_ingestion_task = EmptyOperator(task_id=f'data_ingestion_{i}')

    @task(task_id=f'skipping_task_{i}')
    def skipping_task_func():
        if random.randint(0, 1) == 1:
            raise AirflowSkipException('')

    # Skipping task (randomly)
    skipping_task = skipping_task_func()
    second_task >> data_ingestion_task >> skipping_task

    # Baseline task
    #if execute_baseline:
    baseline_task = EmptyOperator(task_id=f'baseline_{i}')
    current_flow = skipping_task >> baseline_task

    # Soft delete task
    soft_delete_task = EmptyOperator(task_id=f'soft_delete_{i}', trigger_rule='none_failed')
    current_flow = current_flow >> soft_delete_task

    # Update delta
    update_delta_task = EmptyOperator(task_id=f'update_delta_{i}')

    # Save delta
    save_delta_task = EmptyOperator(task_id=f'save_delta_{i}')

    # Dummy task to skip for branch task
    dummy_task = EmptyOperator(task_id=f'dummy_{i}')

    @task.branch(task_id=f'is_delta_to_execute_{i}')
    def choose_flow(upstream_task_id, follow_if_true, follow_if_false, dag_run=None):
        upstream_task_state = dag_run.get_task_instance(upstream_task_id).state
        if upstream_task_state == State.SUCCESS:
            return follow_if_true
        else:
            return follow_if_false

    choose_flow_branch_task = choose_flow(
        upstream_task_id=f'skipping_task_{i}'
        , follow_if_true=[update_delta_task.task_id, save_delta_task.task_id]
        , follow_if_false=dummy_task.task_id
    )

    # First branch logic to follow
    current_flow >> choose_flow_branch_task >> update_delta_task >> save_delta_task >> end_task
    # Second branch logic to follow
    current_flow >> choose_flow_branch_task >> dummy_task >> end_task

dynamic_generated_dag()

Operating System

AKS

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 month ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

jscheffl commented 2 weeks ago

You reported this on Airflow 2.7.2, can you re-test on the most recent version, currently 2.10.2?

github-actions[bot] commented 5 days ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.