apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.57k stars 14.17k forks source link

Deferred task behaviour is different from normal task #40329

Open purnachandergit opened 3 months ago

purnachandergit commented 3 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

V2.7.2

What happened?

I have defined a custom operator inheriting BaseSensorOperator

In execute method of the custom operator First, I am increasing retries by 1 Second, I am reading task instance using context and increasing the max tries by 1 Third, if deferrable, deferring the task else sleeping for 60 sec

In case of deferred task, max tries is not getting increased where in other case, max tries getting updated

What you think should happen instead?

DB update is happening but some where in the deferred flow, the value is getting overridden

How to reproduce

test dag

from future import annotations

import datetime from datetime import timedelta from time import sleep

import pendulum from airflow import DAG from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger from airflow.utils.context import Context from typing import Any

args = { 'owner': 'purna', 'start_date': "2024-06-13", 'depends_on_past': False }

dag = DAG( dag_id="sample_defer_test-purna", schedule="0 0 *", catchup=False, default_args=args )

class CustomUserSensor(BaseSensorOperator): def init( self, deferrable: bool = False, kwargs ) -> None: super().init(kwargs) self.deferrable = deferrable

def execute(self, context:Context) -> None:
    from airflow.models import TaskInstance
    from airflow.utils.session import create_session
    task_instance = context["task_instance"]
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    run_id = task_instance.run_id
    map_index = task_instance.map_index
    self.retries += 1
    with create_session() as session:
        db_task_instance = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id, TaskInstance.run_id == run_id, TaskInstance.map_index == map_index).first()

        if db_task_instance:
            db_task_instance.max_tries += 1
            session.merge(db_task_instance)

        if self.deferrable:
            self.defer(trigger=TimeDeltaTrigger(timedelta(minutes=2)), method_name="execute_complete",)
        else:
            sleep(60)

def execute_complete(self, context: Context, event: dict[str, Any] | None = None,) -> None:
    return

run_task2 = CustomUserSensor(task_id="tun_task2", poke_interval=20, dag=dag, deferrable=True)

run_task3 = CustomUserSensor(task_id="run_task3", poke_interval=20, dag=dag)

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 3 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.