apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.39k stars 14.11k forks source link

Ensure teardown doesn't run until all other tasks complete #29332

Open norm opened 1 year ago

lorransr commented 5 months ago

Hi all,

I'm encountering an issue with the setup and teardown feature in Airflow, which I believe is related to this issue. When executing a DAG with the provided setup script, the teardown task does not wait for dependent tasks to finish before executing. Specifically, if I clear task t1 while task t3 has any of the following statuses: finished, failed, or upstream_failed, the teardown task executes immediately without waiting for t1 to complete.

Steps to Reproduce:

Execute the provided DAG setup script. Clear task t1 while task t3 has one of the mentioned statuses (finished, failed, upstream_failed). Observe the behavior of the teardown task. Expected Behavior: The teardown task should wait until all dependent tasks, including t1, have completed before executing.

Actual Behavior: The teardown task executes immediately after clearing t1, regardless of the status of task t3.

Setup Script:

default_args = {
    "owner": "me",
    "start_date": "2024-04-03",
    "retries": 3,
    "retry_delay": timedelta(minutes=1),
}

with DAG(
    "GDP_T_est",
    default_args=default_args,
    max_active_runs=1,
    catchup=False,
) as dag:
    setup = BashOperator(
        task_id="setup",
        bash_command="""echo "starting" && sleep 5 && echo "done" """,
    )

    t1 = BashOperator(
        task_id="t1",
        bash_command="""echo "t1 starting" && sleep 10 && echo "done" """, 
    )

    t2 = BashOperator(
        task_id="t2",
        bash_command="""echo "t2 starting" && sleep 5 && echo "done" """,
    )

    t3 = BashOperator(
        task_id="t3",
        bash_command="""echo "t3 starting" && sleep 5 && echo "done" """,
    )

    teardown = BashOperator(
        task_id="teardown",
        bash_command="""echo "finishing" && sleep 5 && echo "done" """, 
    )

    (setup >> t1 >> t2 >> t3 >> teardown.as_teardown(setups=setup))

Additional Information:

Airflow Version: 2.7.1 I'm using the airflow image: apache/airflow:2.7.1

Video:

https://github.com/apache/airflow/assets/41809492/9ad81eee-387b-4b00-b402-5b3065ab5e1f

Is this an expected behavior for this feature at the moment?

It seems problematic, specially when using dbt with cosmos, and we end up having the teardown of our cluster while some tasks are still running.

jscheffl commented 5 months ago

Can you please double-check the documentation in https://airflow.apache.org/docs/apache-airflow/stable/howto/setup-and-teardown.html#setup-and-teardown ? The setup task in your example is not marked as setup.

lorransr commented 5 months ago

Thank you @jscheffl, I modified the example with this other syntaxe:

    (setup.as_setup() >> t1 >> t2 >> t3 >> teardown.as_teardown())
    setup.as_setup() >> teardown.as_teardown()

But I see the same behavior. Which is expected since the syntaxes are equivalent 🤔

Could this actually be a bug?

jscheffl commented 5 months ago

Okay, I did a regression and can confirm this bug.

Note that the additional dependency from Setup >> Teardown is not needed. It is modelled automatically. I used a modified example example_setup_teardown.py and added two normal tasks in the middle.

with DAG(
    dag_id="example_setup_teardown",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    root_setup = BashOperator(task_id="root_setup", bash_command="echo 'Hello from root_setup'").as_setup()
    root_normal = BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
    root_normal2 = BashOperator(task_id="normal2", bash_command="echo 'I am just a normal task'")
    root_normal3 = BashOperator(task_id="normal3", bash_command="echo 'I am just a normal task'")
    root_teardown = BashOperator(
        task_id="root_teardown", bash_command="echo 'Goodbye from root_teardown'"
    ).as_teardown(setups=root_setup)
    root_setup >> root_normal >> root_normal2 >> root_normal3 >> root_teardown

The error exactly appears if you clear a normal task between setup and teardown which is NOT the last task before teardown and select no downstream tasks. Means that the setup is executed, then the real cleared task. The bug is that the only direct pre-ceeding task before teardown is already finished such that for the scheduler it seems to be "ready for teardown". Scheduler does not check whether all tasks between setup and teardown are completed.

In the example above the error is generated if either task "normal" or "normal2" are cleared w/o downstream. It works as expected if "normal3" is cleared. I made my regression on latest main after branching off 2.9.0rc2. So the problem most probably is in since the beginning.

dstandish commented 4 months ago

looks like what we need to do is find all the tasks “in the scope” of the teardown

(and that is, those tasks that are in between the teardown and its setup(s))

and then just make sure they are not running.

one idea for fixing:

image

but, it would probably be more efficient if we did not hit that block unless the direct upstreams already passed

so another option would be to instead add something like this to the end

image

then it would only evaluate the indirect in-scope tasks are done if everything else passed.

(i'm missing an else in that screenshot to guarantee it's a teardown)

ideally perhaps, we would also be able to know that this run was the result of a clear and then don’t bother to run this check generally speaking!