apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

clear a deferred task do not increment the tries #38735

Closed raphaelauv closed 1 month ago

raphaelauv commented 7 months ago

Apache Airflow version

2.8.4

If "Other Airflow 2 version" selected, which one?

No response

What happened?

if you clear a deferred task , it do not increment the tries

only mark it failed or succes and then clear it , increment the tries

What you think should happen instead?

No response

How to reproduce

from datetime import timedelta
from pendulum import today
from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync

with DAG(
        dag_id="test_clear_async",
        schedule_interval=None,
        start_date=today("UTC").add(days=-1)
):
    TimeDeltaSensorAsync(task_id="wait_some_seconds_async", delta=timedelta(seconds=500))

activate the pipeline trigger the pipeline clear the task while it's deferred

Operating System

ubuntu 22.04

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

tirkarthi commented 7 months ago

When a task is deferred the _try_number is decremented so that when the trigger yields an event the task resumes execution where _try_number is incremented to use the same try_number before and after trigger. Perhaps when the task being cleared is in deferred state the try_number can be incremented.

commit f08bc8d1326f2c7c271928d74804050c36b4e953
Author: Karthikeyan Singaravelan <tir.karthi@gmail.com>
Date:   Tue Apr 9 11:10:06 2024 +0530

    Increment try_number for cleared deferred tasks.

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index d52a71c5b2..62b4d3e821 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -274,6 +274,12 @@ def clear_task_instances(
                 ti.state = TaskInstanceState.RESTARTING
                 job_ids.append(ti.job_id)
         else:
+            # When the task is deferred the try_number is decremented so that the same try
+            # number is used when the task handles the event. But in case of clearing the try
+            # number should be incremented so that the next run doesn't reuse the same try
+            if ti.state == TaskInstanceState.DEFERRED:
+                ti._try_number += 1
+
             ti_dag = dag if dag and dag.dag_id == ti.dag_id else dag_bag.get_dag(ti.dag_id, session=session)
             task_id = ti.task_id
             if ti_dag and ti_dag.has_task(task_id):
tirkarthi commented 7 months ago

I have created PR https://github.com/apache/airflow/pull/38984 . This is already handled while marking tasks as success/failure.

raphaelauv commented 1 month ago

no more present on 2.10.2