apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.06k stars 14.29k forks source link

XCom is cleared when a task resumes from deferral #41098

Closed Sanketp1997 closed 3 months ago

Sanketp1997 commented 3 months ago

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.4+astro.3

What happened?

A task's XCom value is cleared when a task is rescheduled after being deferred.

What you think should happen instead?

XCom should not be cleared in this case, as it is still the same task run.

How to reproduce

` import logging from datetime import datetime, timezone

from airflow import DAG from airflow.decorators import task from airflow.operators.python import get_current_context from airflow.sensors.base import PokeReturnValue

with DAG( dag_id="defer_behavior_test", schedule=None, start_date=datetime(2024, 7, 1, tzinfo=timezone.utc), catchup=False, concurrency=1, max_active_runs=1, default_view="graph", ):

@task.sensor(
    poke_interval=10,
    timeout=60,
    mode="reschedule",
)
def should_defer_once():
    ctx = get_current_context()
    ti = ctx.get("ti")
    xcom_content = ti.xcom_pull(key="xcom_content")
    if xcom_content is not None :
        logging.info(f"xcom_content: {xcom_content}")
        return PokeReturnValue(True)
    else:
        logging.info("xcom_content not available")
        logging.info(f"xcom_content: {xcom_content}")
    ti.xcom_push(key="xcom_content", value="Dummy XCOM content")
    logging.info("Pushed dummy XCOM content")
    return PokeReturnValue(False)

@task(trigger_rule="all_done")
def check_xcom_content():
    ctx = get_current_context()
    ti = ctx.get("ti")
    xcom_content = ti.xcom_pull(key="xcom_content")
    logging.info(f"xcom_content: {xcom_content}")

should_defer_once() >> check_xcom_content()

`

Operating System

macOS

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

Code of Conduct

romsharon98 commented 3 months ago

First of all, your task doesn't enter deferred mode because of this statement if xcom_content is not None or xcom_content != "" or xcom_content != "None" . After changing it to if xcom_content is not None , I was able to reproduce the issue.

Here is the relevant part of the code that should handle this. Perhaps you could take a look and try to fix it.

https://github.com/apache/airflow/blob/8a1b84dc54b403985e64a9cdb88e9731e83c8b39/airflow/models/taskinstance.py#L3046-L3047

Sanketp1997 commented 3 months ago

@romsharon98 Oh yes! I was testing some other things and pasted the wrong snippet thanks for taking a look and correcting it!

Also, thanks for sharing the relevant code that causes the issue but I am not very deep into complete workflows and architecture... I am not sure if I try to change the behavior will that break any other feature or workflows!

But Sure it's worth raising a PR so reviewers can have a look 👍🏼

romsharon98 commented 3 months ago

I think this issue is more a feature then a bug.

First let's clarify some definitions.

(took from the documentation)

Because sensors are primarily idle, they have two different modes of running so you can be a bit more efficient about using them:

Because you are using reschedule it repeat the whole execution and part of it is to clear the Xcom https://github.com/apache/airflow/blob/8a1b84dc54b403985e64a9cdb88e9731e83c8b39/airflow/models/taskinstance.py#L3045-L3047

If you will use mode="poke" the Xcom will be saved. (you can read about the tradeoff between those two here.

About deferrable as mention here it's NOT built-in parameter or mode in Airflow, when using deferrable the Xcom should save (according the comment above the code), If you want you can implement a deferred sensor.

here you can read more about the difference between deferrable and reschedule.

Let me know if it was helpful or if you have more questions before closing this issue 😃

Sanketp1997 commented 3 months ago

Hi @romsharon98,

After a detailed look, we also came to the same conclusion you mentioned above and would consider implementing our custom deferrable operators for our needs.

Thank you for all the explanation. Will close the issue now.