apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.13k stars 14.31k forks source link

Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs` #25431

Open cansjt opened 2 years ago

cansjt commented 2 years ago

Apache Airflow version

2.3.3 (latest released)

What happened

When implementing a custom operator I stumbled on the following issue:

Note that there is an additional difficulty to this problem: the names of the kwargs do not necessarily match the name of the instance attribute. It can be easily worked around, by adding both name to the class' shallow_copy_attrs list. That things a bit redundant, though.

Not sure what the _BaseOperator__init_kwargs is used for, but if one can deepcopy an operator, I cannot help but wonder why it is needed?

What you think should happen instead

The argument passed to the __init__() method should be shallow copied as expected / requested by the operator implementor, following the contract that attribute values listed in shallow_copy_attr should be shallow copied.

How to reproduce

I discovered the issue because, in a custom operator I was passing a object (from a third party package) that has a mis-implemented __getattr__() and it was getting copied anyways, having my DAG fall in an infinite recursion when attempting to copy it.

Note that for brevity I also took a little shortcut: in the real case, the faulty instance is not directly attached to the operator instance but an attribute on another object that is itself attached to the operator. Which could make the use of the shallow_copy_attrs effective where it is here ineffective in the first example. One can consider that the adapter class, in the examples below, takes the role of the intermediate object. Still the intermediate being deepcopied, when it shouldn't, make set shallow_copy_attrs ineffective.

This first example, shows the initial situation:

import copy

from airflow.operators.python import PythonOperator

class ThirdPartyClassWithMisbehavedGetAttr:

    def __getattr__(self, item):
        if attr := getattr(self, f'_{item}'):
            return attr
        raise AttributeError

class CustomOperator(PythonOperator):

    shallow_copy_attrs = ('_misbehavedparam', 'misbehavedparam')

    def __init__(self,
                 *,
                 misbehavedparam: None | ThirdPartyClassWithMisbehavedGetAttr = None,
                 **kwargs):
        super().__init__(**kwargs)
        self._misbehavedparam = misbehavedparam or ThirdPartyClassWithMisbehavedGetAttr()

    def __deepcopy__(self, *args):  # Only here to intercept the call and allow debuging
        breakpoint()
        super().__deepcopy__(*args)

def f(**kwargs):
    print('here')

operator1 = CustomOperator(python_callable=f, task_id='doit')
print(operator1._BaseOperator__init_kwargs)
result = copy.deepcopy(operator1)  # Infinite recursion loop.

Running the code above fails with the exception:

Traceback (most recent call last):
  File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__
    if attr := getattr(self, f'_{item}'):
  File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__
    if attr := getattr(self, f'_{item}'):
  File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__
    if attr := getattr(self, f'_{item}'):
  [Previous line repeated 997 more times]
RecursionError: maximum recursion depth exceeded

Here is a way to work around the problem. Assuming the adapter class below can somehow reconstruct the faulty instance. In my case it is possible (not showed here for brevity):

class CustomOperatorWithWorkAround(PythonOperator):

    def __init__(self,
                 *,
                 misbehavedparam: None | ThirdPartyClassWithMisbehavedGetAttr = None,
                 **kwargs):
        super().__init__(**kwargs)
        self._misbehavedparam = MisbehavedAdapter(misbehavedparam
                                                  or ThirdPartyClassWithMisbehavedGetAttr())

    def __deepcopy__(self, *args):
        breakpoint()
        super().__deepcopy__(*args)

class MisbehavedAdapter:

    def __init__(self, adaptee):
        self._adaptee = adaptee

    @classmethod
    def copy(cls) -> 'MisbehavedAdapter':
        # Reconstruct the fault instance somehow
        return MisbehavedAdapter(ThirdPartyClassWithMisbehavedGetAttr())

    def __getattr__(self, item):
        return getattr(self._adaptee, item)

    def __reduce_ex__(self, version):
        return (MisbehavedAdapter.copy, tuple())

operator2 = CustomOperatorWithWorkAround(  # misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(),
                                         python_callable=f,
                                         task_id='doit',
                                         )
print(operator2._BaseOperator__init_kwargs)
result = copy.deepcopy(operator2)

It's a bit of work but until the third party library is fixed, that works for me.

Now if I uncomment the misbehavedparam kwarg in the above example:

operator3 = CustomOperatorWithWorkAround(misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(),
                                         python_callable=f,
                                         task_id='doit',
                                         )
print(operator3._BaseOperator__init_kwargs)
result = copy.deepcopy(operator3)

The infinite recursion is back again, for the reasons exposed above (copy of the _BaseOperator__init_kwargs attribute, which has captured a reference to the faulty instance)

So for now, to work around the problem, I have to set the instance attribute outside of the constructor, I added a setter (:cry:) to wrap it with the adapter:

class CustomOperatorWithWorkAround(PythonOperator):

    @property
    def misbehavedparam(self):
        return self._fdupparam

    @misbehavedparam.setter
    def misbehaved(self, value):
        self._fdupparam = MisbehavedAdapter(value)

    def __deepcopy__(self, *args):
        # breakpoint()
        return super().__deepcopy__(*args)

operator4 = CustomOperatorWithWorkAround(python_callable=f,
                                         task_id='doit',
                                         )
operator4.fdupparam = ThirdPartyClassWithFdUpGetAttr()
print(operator4._BaseOperator__init_kwargs)
result = copy.deepcopy(operator4)
print(result)

Operating System

Debian

Versions of Apache Airflow Providers

not relevant.

Deployment

Other

Deployment details

not relevant

Anything else

We could make a special case of the copy of that attribute. There is already one for the copy of the _BaseOperator__instantiated one. Remains the question of how do we want to handle that special case?

But then, not copying the `_BaseOperator__init_kwargs` first, breaks `__setattr__()`. I find [that behavior weird](https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L1007) because that is kind of assuming that the name of the formal parameter is also the name of an attribute, and if so that there necessarily are related (which is likely, but nothing actually make sure it is, so to me it seems more like a broken assumption than anything). And why would setting an attribute change the arguments of the `__init__()` method? But that is yet another issue.

We can get a working version by moving up the initialization of the `__init_kwargs` attribute:
```diff
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 2795a0f53..6507dd6c1 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1164,14 +1164,26 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):

         shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs

+        shallow_map = {}
+        result.__init_kwargs = init_kwargs = {}
         for k, v in self.__dict__.items():
-            if k == "_BaseOperator__instantiated":
+            if k in ("_BaseOperator__instantiated", "_BaseOperator__init_kwargs"):
                 # Don't set this until the _end_, as it changes behaviour of __setattr__
                 continue
             if k not in shallow_copy:
                 setattr(result, k, copy.deepcopy(v, memo))
             else:
                 setattr(result, k, copy.copy(v))
+                shallow_map = {id(v): getattr(result, k)}
+        for k, v in self.__dict__["_BaseOperator__init_kwargs"]:
+            id_ = id(v)
+            if id_ in shallow_map:
+                init_kwargs[k] = shallow_map[id_]
+            elif id_ in memo:
+                init_kwargs[k] = memo[id_]
+            else:
+                init_kwargs[k] = copy.deepcopy(v, memo)
+
         result.__instantiated = self.__instantiated
         return result

Shall we use the memo dict instead of the additional shallow_map dict? Could prevent the same issue to happen further down the line, if the same value is somehow referenced elsewhere.

We should also note that keeping the copy of this __init_kwargs dict means you leave the burden of "fencing" against misbehaved objects outside of the operator __init__(). Meaning you force users to do something like:

op = CustomOperatorWithWorkAround(misbehavedparam=MisbehavedAdapter(ThirdPartyClassWithMisbehavedGetAttr()),
                                  python_callable=f,
                                  task_id='doit',
                                  ) 

Instead of letting the operator deal with it internally:

class CustomOperatorWithWorkAround(PythonOperator):
    def __init__(self, *, misbehavedparam, **kwargs):
        super().__init__(**kwargs)
        self._misbehavedparam = MisbehavedAdapter(misbehavedparam)

op = CustomOperatorWithWorkAround(misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(),
                                  python_callable=f,
                                  task_id='doit',
                                  ) 

Are you willing to submit PR?

Code of Conduct

cansjt commented 2 years ago

Greping through the code on the main branch, I cannot find any place where the __init_kwargs attribute is actually ever read. It is set in different places, but not read:

$ git grep _init_kwargs
airflow/models/baseoperator.py:406:            if not hasattr(self, '_BaseOperator__init_kwargs'):
airflow/models/baseoperator.py:407:                self._BaseOperator__init_kwargs = {}
airflow/models/baseoperator.py:413:            self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
airflow/models/baseoperator.py:680:    __init_kwargs: Dict[str, Any]
airflow/models/baseoperator.py:755:        self.__init_kwargs = {}
airflow/models/baseoperator.py:1007:        if key in self.__init_kwargs:
airflow/models/baseoperator.py:1008:            self.__init_kwargs[key] = value
airflow/models/baseoperator.py:1168:        result.__init_kwargs = init_kwargs = {}
airflow/models/baseoperator.py:1170:            if k in ("_BaseOperator__instantiated", "_BaseOperator__init_kwargs"):
airflow/models/baseoperator.py:1178:        for k, v in self.__dict__["_BaseOperator__init_kwargs"]:
airflow/models/baseoperator.py:1466:                    '_BaseOperator__init_kwargs',

What am I missing?

potiuk commented 2 years ago

I think if you want to propose something, it's better to open PR and discuss it there adding your explanation over the code. This will make it far more productive discussion than trying to wrap the head around copy & pasted code from various places. This will take a long time for anyone looking at it to spend their energy on, and making a draft PR with what you proposed to do is far better IMHO. I have now big knowledge about this part, but I kinda dread having to take a look and try to understand what you want to do and why, becaus of all the copy &pasted code.

Just sayin;

eladkal commented 2 years ago

@cansjt do you plan to open a PR?

boushphong commented 1 year ago

I'm facing the same problem in 2.5.0. Can I work on this?

eladkal commented 1 year ago

sure. assigned

boushphong commented 1 year ago

I have noticed that this issue also leads to a slightly more severe problems when Apache Airflow is deployed on Kubernetes. The problem seems to always result in the first pod of a task being caught in the Error state while the task still finishes. I've been debugging this, but to no avail. Shall I raise a new issue? perhaps a better description would make it easier for someone to pick this up.

potiuk commented 1 year ago

Yes. If you can refer to that one and have a super-easy reproducible path, creating a new issue and marking it as "Related to:" is a good idea. We can then close that one as duplicate.

github-actions[bot] commented 7 months ago

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.