apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.44k stars 14.11k forks source link

Xcom support for reschedule sensors #39532

Closed j-stew closed 4 months ago

j-stew commented 4 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.1

What happened?

Using sensor in reschedule mode with xcom, the xcom clears between reschedule checks

Note: Deferrable operators not viable here due to database slowness

What you think should happen instead?

The xcom persists across sensor reschedule checks

How to reproduce

import logging
from datetime import datetime

from airflow import DAG
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.models import XCom

class TestAsyncSensor(BaseSensorOperator):
    def poke(self, context):
        number = context['ti'].xcom_pull(key="number", task_ids=self.task_id + "_original", dag_id=self.dag_id)
        logging.info(
            f"Start number: {number}, Date: {context['ti'].execution_date}"
        )
        if number is None:
            number = 1
            # context['ti'].xcom_push(key="number", value=number)
        elif number < 10:
            number += 1
        else:
            return True
        XCom.set(
            key="number",
            value=number,
            task_id=self.task_id + "_original",
            dag_id=self.dag_id,
            execution_date=context['ti'].execution_date)
        logging.info(
            f"End number: {number}"
        )
        return False

with DAG(
    "test_async_sensor",
    schedule_interval="@once",
    catchup=False,
    default_args={
        "depends_on_past": False,
        "start_date": datetime(2024, 5, 1),
    },
) as dag:
    ss = TestAsyncSensor(dag=dag,
                         task_id="async_sensor",
                         mode="reschedule",
                         do_xcom_push=True)
    ss

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 4 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Taragolis commented 4 months ago

Using sensor in reschedule mode with xcom, the xcom clears between reschedule checks

This one is by design of XComs and could not be considered as a bug. For add this feature it is required significant change is Airflow Codebase.

j-stew commented 4 months ago

@Taragolis Thanks for your reply. Is there a specific reason we do not want reschedule mode with Xcoms or is the barrier just the significant code changes required?

potiuk commented 4 months ago

@j-stew -> It's basic design of Airflow tasks -namely idempotency of tasks. In Airflow, every run of the tasks is idempotent, which means that re-running the task is equivalent to running it anew. And Airflow assumes the previous run did not happen. This assumption has many consequences for backfill and DAG dependency scenarios.

Note: Deferrable operators not viable here due to database slowness

If you see any slowness there, please open a new discussion describing what is the scenario you see and slowness you see (but please - unless you are sure this is an airflow issue, open a discussion, not issue). And make sure to explain what "slowness" is the problem . There should not be any extra slowness of deferrable operators (if they are correctly implemented)- especially comparing to rescheduling tasks (which also heavily depends on database).