apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

Deferred operator do not preserve attribute values set during execution #40450

Closed kacpermuda closed 4 months ago

kacpermuda commented 4 months ago

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Maybe it's not a bug but a result of some limitation: values assigned to an operator instance during execution are not accessible after the operator has been deferred.

So if i set self.job_id = "999" in __init__, then assign it as self.job_id = "1234" in execute(), when accessing it from execute_complete() after being deferred, the value is still "999". The same wrong value is retrieved in the listener (which is expected, as it's called not so long after execute_complete()).

Please see the example code in the "How to reproduce" section, to better understand what's happening.

What you think should happen instead?

I would expect all the modifications made to an operator instance to be preserved and accessible after being deferred, both in execute_complete() (or any other method name passed to the triggerer) and to the listener (f.e. OpenLineage one).

I thought such behaviour is already expected and assumed by the community, looking at the code of BigQueryInsertJobOperator returning self.job_id that's overwritten within execute() method, but in other operators f.e. some AWS operators, we are simply getting needed value from the trigerrer and then returning it (e.g. BatchOperator) so maybe current behaviour is correct and expected.

How to reproduce

You can run my sample DAG that below and investigate the error logs, or see any operator that's trying to access a value set from within execute() f.e. BigQueryInsertJobOperator trying to return self.job_id in execute_complete() - it will always be None, as set in __init__.

DAG code ``` from __future__ import annotations import datetime as dt from typing import Any from airflow import DAG from airflow.models.baseoperator import BaseOperator from airflow.triggers.temporal import TimeDeltaTrigger from airflow.utils.context import Context from airflow.utils.log.logging_mixin import LoggingMixin class TestOperator(BaseOperator, LoggingMixin): def __init__( self, predefined, deferrable, **kwargs, ) -> None: super().__init__(**kwargs) self.predefined = predefined self.job_id = None self.deferrable = deferrable def execute(self, context: Context) -> Any: # it's usually random value generated by some service / API, that we can only get inside of execute() self.job_id = "1234" if self.deferrable: self.defer( trigger=TimeDeltaTrigger(dt.timedelta(seconds=5)), method_name="execute_complete", ) else: self.log.error("Not in deferrable mode, returning self.job_id = %s", self.job_id) # 1234 self.log.error("Value of self.predefined = %s", self.predefined) # OK return self.job_id def execute_complete( self, context: Context, event: dict[str, Any] | None = None, ) -> None: self.log.error("Job id from context: %s", str(context["task"].job_id)) # None self.log.error("Done after deferred, returning self.job_id = %s", self.job_id) # None self.log.error("Value of self.predefined = %s", self.predefined) # OK return self.job_id def get_openlineage_facets_on_complete(self, task_instance): from airflow.providers.openlineage.extractors import OperatorLineage self.log.error("Value of self.predefined = %s", self.predefined) # OK self.log.error("Value of self.job_id = %s", self.job_id) # None return OperatorLineage() with DAG( "test_deferrable", start_date=dt.datetime(2024, 1, 1), schedule_interval=None, ) as dag: not_deferred = TestOperator(task_id="not_deferred", predefined="my_predefined_value", deferrable=False) deferred = TestOperator(task_id="deferred", predefined="my_predefined_value", deferrable=True) ```

Operating System

MacOS 14.5

Versions of Apache Airflow Providers

main branch for all providers.

At the time of testing, the latest versions are: amazon==8.25.0 google==10.20.0 openlineage==1.8.0

Deployment

Astronomer

Deployment details

Tested on Astro and breeze.

Anything else?

If passing this value to and from the triggerer or using xcom is the only way to do this, and it's not a core Airflow bug then please confirm that for me, and I'll add some clear notes in the documentation, and I'll adjust the behaviour of OpenLineage listener to account for that (as then it's simply some providers bug, that did not account for core Airflow behaviour).

From the listener's perspective, current scenario requires some extra effort: when trying to use some values obtained during the execution, we need a way to use them in the listener's methods. When operator is not in deferrable mode it's easily accessible, otherwise we need to somehow pass all the necessary information and recreate some resources (e.g. hooks).

I think issue #40329 is somehow related to this as well, but it's more about refreshing information from db, so i decided to create another one.

Also #40432 seems related

Are you willing to submit PR?

Code of Conduct

kacpermuda commented 4 months ago

Sorry, i think i missed these part of the docs. It's not a bug, it's a desired behaviour.

When you opt to defer, your operator will stop executing at that point and be removed from its current worker. No state will persist, such as local variables or attributes set on self. When your operator resumes, it resumes as a new instance of it. The only way you can pass state from the old instance of the operator to the new one is with method_name and kwargs.