apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.11k stars 14.3k forks source link

Removed mapped tasks block all_success trigger rule #33164

Closed tanelk closed 3 months ago

tanelk commented 1 year ago

Apache Airflow version

2.6.3

What happened

When rerunning a DAG run with dynamically mapped tasks and the number of mapped task instances degreases, then downstream tasks with all_success trigger rule (likely some others as well) will not get scheduled, because removed status is not considered to be successful. When a regular task gets removed, then this does not happen, because it will get removed from the DAG structure.

Task dependency message Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 2 non-success(es). upstream_states=_UpstreamTIStates(success=22, skipped=0, failed=0, upstream_failed=0, removed=2, done=24), upstream_task_ids={'upstream_task'}

Previously you could forcefully run it with the "run" button in UI, but it has been removed in one of the resent releases.

Luckily I could delete the removed TIs and then the downstream tasks got rescheduled, but this is not "scalable".

What you think should happen instead

Removed TIs should not block any trigger rules from getting executed.

How to reproduce

-

Operating System

-

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

Code of Conduct

dzhigimont commented 1 year ago

@tanelk Could you please provide an example of DAG?

RNHTTR commented 1 year ago

Can you please provide reproduction steps?

Also, what do you mean by the following:

the number of mapped task instances decreases

How are the number of mapped task instances decreasing during a DAG run?

tanelk commented 1 year ago

Example DAG

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

with DAG(
    'removed_mapped_tasks',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 8, 7),
) as dag:

    @task
    def gen_elements():
        return [1, 2, 3]

    @task
    def mapped_task(element):
        return element * 2

    mapped_task.expand(element=gen_elements()) >> EmptyOperator(task_id='end')

Let it complete and then return one less element from the gen_elements task. Then clear the last DAG run.

The end task will not get scheduled because Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=2, skipped=0, failed=0, upstream_failed=0, removed=1, done=3), upstream_task_ids={'mapped_task'}

In this very simple DAG, the run will be failed with scheduler | [2023-08-09T13:49:56.448+0300] {dagrun.py:651} ERROR - Task deadlock (no runnable tasks); marking run <DagRun removed_mapped_tasks @ 2023-08-06 21:00:00+00:00: scheduled__2023-08-06T21:00:00+00:00, state:running, queued_at: 2023-08-09 10:49:44.110567+00:00. externally triggered: False> failed

On more complex structures the deadlock detection might not kick in.

shahar1 commented 3 months ago

Example DAG

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

with DAG(
    'removed_mapped_tasks',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 8, 7),
) as dag:

    @task
    def gen_elements():
        return [1, 2, 3]

    @task
    def mapped_task(element):
        return element * 2

    mapped_task.expand(element=gen_elements()) >> EmptyOperator(task_id='end')

Let it complete and then return one less element from the gen_elements task. Then clear the last DAG run.

The end task will not get scheduled because Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=2, skipped=0, failed=0, upstream_failed=0, removed=1, done=3), upstream_task_ids={'mapped_task'}

In this very simple DAG, the run will be failed with scheduler | [2023-08-09T13:49:56.448+0300] {dagrun.py:651} ERROR - Task deadlock (no runnable tasks); marking run <DagRun removed_mapped_tasks @ 2023-08-06 21:00:00+00:00: scheduled__2023-08-06T21:00:00+00:00, state:running, queued_at: 2023-08-09 10:49:44.110567+00:00. externally triggered: False> failed

On more complex structures the deadlock detection might not kick in.

Example DAG

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

with DAG(
    'removed_mapped_tasks',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 8, 7),
) as dag:

    @task
    def gen_elements():
        return [1, 2, 3]

    @task
    def mapped_task(element):
        return element * 2

    mapped_task.expand(element=gen_elements()) >> EmptyOperator(task_id='end')

Let it complete and then return one less element from the gen_elements task. Then clear the last DAG run.

The end task will not get scheduled because Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=2, skipped=0, failed=0, upstream_failed=0, removed=1, done=3), upstream_task_ids={'mapped_task'}

In this very simple DAG, the run will be failed with scheduler | [2023-08-09T13:49:56.448+0300] {dagrun.py:651} ERROR - Task deadlock (no runnable tasks); marking run <DagRun removed_mapped_tasks @ 2023-08-06 21:00:00+00:00: scheduled__2023-08-06T21:00:00+00:00, state:running, queued_at: 2023-08-09 10:49:44.110567+00:00. externally triggered: False> failed

On more complex structures the deadlock detection might not kick in.

I tried to run this example with the v2.9.3 (first run with original DAG, remove one element, and clear one DAG run) and I didn't manage to reproduce it - all tasks succeeded. I'm closing this issue, please create a new one if you encounter this again in versions 2.9.3+ (while providing reproducible examples).

nick-brady commented 2 months ago

I don't have time to make a reproducible example, but this happened to me on Airflow 2.6.2. I manually delete the removed task for the time being until I can try this on a newer version