bryzgaloff / airflow-clickhouse-plugin

The most popular ClickHouse plugin for Airflow. 🔝 Top-1% downloads on PyPI: https://pypi.org/project/airflow-clickhouse-plugin! Based on mymarilyn/clickhouse-driver.
MIT License
144 stars 36 forks source link

ClickHouseBranchSQLOperator: AirflowException, Invalid arguments #87

Closed cra closed 3 months ago

cra commented 3 months ago

Hello! I've been using the regular ClickHouseOperator for my DAGs for a while now and I noticed that you support DB API 2.0 so I tried to using ClickHouseBranchSQLOperator and ran into an issue

check_tbl_exists = ClickHouseBranchSQLOperator(
        task_id='check_if_table_exists',
        sql='check_if_table_exists.sql',
        conn_id='ch_default',
        follow_task_ids_if_true='check_if_table_empty',
        follow_task_ids_if_false='create_agg_table',
    )

It seems to match the way the BranchSQLOperator is used but I get ImportError when trying to use this task:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 484, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 881, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to ClickHouseBranchSQLOperator (task_id: check_if_table_exists). Invalid arguments were:
**kwargs: {'sql': 'check_if_table_exists.sql', 'follow_task_ids_if_true': 'check_if_table_empty', 'follow_task_ids_if_false': 'create_agg_table'}

Could you please provide an example of how this operator is supposed to be used? I'm probably missing something silly, but cannot figure out what

cra commented 3 months ago

Changing the order of base classes in inheritance solved my issue

from airflow.providers.common.sql.operators import sql
from airflow_clickhouse_plugin.operators.clickhouse_dbapi import ClickHouseBaseDbApiOperator

class ClickHouseBranchSQLOperator(
    sql.BranchSQLOperator,
    ClickHouseBaseDbApiOperator,
):
    pass
bryzgaloff commented 3 months ago

Hi @cra and thank you for reporting this!

TL;DR The behaviour looks strange. Before proceeding with the change, I suggest understanding the issue first.


Though I was able to reproduce it, this behaviour looks strange to me. This is what ClickHouseBranchSQLOperator.__mro__ shows:

Since the first 3 classes do not define __init__ method at all, ClickHouseBranchSQLOperator(…) call should resolve into calling BranchSQLOperator.__init__ as the first one. This should result in properly handling all the defined kwargs, leaving no unprocessed ones to the Airflow's BaseOperator which raises the AirflowException.

I have also created a small code snippet to reproduce this classes hierarchy:

# Airflow base operator
class BaseOperator(object):
    def __init__(self, **kwargs):
        print(f'BaseOperator.__init__ called: {kwargs=}')
        if kwargs:
            raise AssertionError(f'unprocessed {kwargs=}')

# Common SQL operators
class BaseSQLOperator(BaseOperator):
    def __init__(self, **kwargs):
        print(f'BaseSQLOperator.__init__ called: {kwargs=}')
        super().__init__(**kwargs)

    def get_db_hook(self):
        print('BaseSQLOperator.get_db_hook called')

class BranchSQLOperator(BaseSQLOperator):
    def __init__(self, follow_task_ids_if_true, **kwargs):
        print(f'BranchSQLOperator.__init__ called: {kwargs=}')
        super().__init__(**kwargs)

# ClickHouse base operators
class ClickHouseDbApiHookMixin(object):
    def _get_clickhouse_db_api_hook(self):
        print('ClickHouseDbApiHookMixin._get_clickhouse_db_api_hook called')

class ClickHouseBaseDbApiOperator(ClickHouseDbApiHookMixin, BaseSQLOperator):
    def get_db_hook(self):
        print('ClickHouseBaseDbApiOperator.get_db_hook called')
        return self._get_clickhouse_db_api_hook()

# The target class
class ClickHouseBranchSQLOperator(ClickHouseBaseDbApiOperator, BranchSQLOperator):
    pass

print(ClickHouseBranchSQLOperator.__mro__)
print('\ncalling ClickHouseBranchSQLOperator()')
operator = ClickHouseBranchSQLOperator(follow_task_ids_if_true=['task_1'])
print('\ncalling get_db_hook()')
operator.get_db_hook()

And it works perfectly fine, here is the output:

(<class '__main__.ClickHouseBranchSQLOperator'>, <class '__main__.ClickHouseBaseDbApiOperator'>, <class '__main__.ClickHouseDbApiHookMixin'>, <class '__main__.BranchSQLOperator'>, <class '__main__.BaseSQLOperator'>, <class '__main__.BaseOperator'>, <class 'object'>)

calling ClickHouseBranchSQLOperator()
BranchSQLOperator.__init__ called: kwargs={} 
BaseSQLOperator.__init__ called: kwargs={}
BaseOperator.__init__ called: kwargs={}

calling get_db_hook()
ClickHouseBaseDbApiOperator.get_db_hook called
ClickHouseDbApiHookMixin._get_clickhouse_db_api_hook called

BranchSQLOperator.__init__ called the first one without any unknown kwargs.

Thank you for sharing your solution. But before we proceed with the code change, I would like to understand the behaviour. Because it might be not the plugin's issue.

Do you have any clues of what happens in Airflow and why its behaviour changes from the regular Python MRO as shown in the above code snippet? Maybe the code snippet misses something significant differentiating from Airflow's implementation (some meta classes maybe, though I have checked them and spotted no crucial difference).

When we change the class definition to class ClickHouseBranchSQLOperator(sql.BranchSQLOperator, ClickHouseBaseDbApiOperator), the MRO is changed to:

Which means that ClickHouseBaseDbApiOperator is likely to break the MRO and call __init__ of its base class: BaseSQLOperator — skipping BranchSQLOperator's one. But the simplified code sample does not reproduce it, though the same change in the MRO happens if the base classes order of ClickHouseBranchSQLOperator is switched.

bryzgaloff commented 3 months ago

As a quicker option, you may also proceed with a PR. Ideally, please start a PR with tests only. They should fail for the reported case. Once we confirm the tests fail, you may proceed with the code change fixing it by ~switching the order of the base classes~ (please see the new proper way to fix below). After that the tests should pass.

grihabor commented 3 months ago

Here is another solution:

 class ClickHouseDbApiHookMixin(object):
     # these attributes are defined in both BaseSQLOperator and SqlSensor
     conn_id: str
     hook_params: t.Optional[dict]

+    def __init__(self, **kwargs) -> None:
+        super().__init__(**kwargs)

Apparently just adding __init__ fixes the problem.

I think I know what's going on.

You're right, something is funky, so the first thing I checked is the metaclass of the base class. And indeed BaseOperator uses a custom metaclass: https://github.com/apache/airflow/blob/81845de9d95a733b4eb7826aaabe23ba9813eba3/airflow/models/baseoperator.py#L520

And here is the line that breaks everything: https://github.com/apache/airflow/blob/81845de9d95a733b4eb7826aaabe23ba9813eba3/airflow/models/baseoperator.py#L515

If you don't have __init__ in your class, it creates it anyway, but uses the parent class at the moment of class declaration. This is not the same as MRO:

setting new_cls=<class 'airflow.models.baseoperator.BaseOperator'>.__init__ to new_cls.__init__=<function BaseOperator.__init__ at 0x765141ae2d40>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.BaseSQLOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651419a9bc0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator'>.__init__ to new_cls.__init__=<function SQLExecuteQueryOperator.__init__ at 0x7651419a9f80>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator'>.__init__ to new_cls.__init__=<function SQLColumnCheckOperator.__init__ at 0x7651419aa5c0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLTableCheckOperator'>.__init__ to new_cls.__init__=<function SQLTableCheckOperator.__init__ at 0x7651419aaa20>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLCheckOperator'>.__init__ to new_cls.__init__=<function SQLCheckOperator.__init__ at 0x7651419aad40>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLValueCheckOperator'>.__init__ to new_cls.__init__=<function SQLValueCheckOperator.__init__ at 0x7651419aafc0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator'>.__init__ to new_cls.__init__=<function SQLIntervalCheckOperator.__init__ at 0x7651419ab600>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLThresholdCheckOperator'>.__init__ to new_cls.__init__=<function SQLThresholdCheckOperator.__init__ at 0x7651419ab7e0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.BranchSQLOperator'>.__init__ to new_cls.__init__=<function BranchSQLOperator.__init__ at 0x7651419abb00>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseBaseDbApiOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651419aa020>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLExecuteQueryOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLColumnCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLTableCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLValueCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLIntervalCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLThresholdCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseBranchSQLOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>

So when you run your class, the method that actually gets called is BaseSQLOperator.__init__ and not BranchSQLOperator.__init__.

I opened an issue in airflow: https://github.com/apache/airflow/issues/41085 And the fix: https://github.com/apache/airflow/pull/41086

bryzgaloff commented 3 months ago

Hi @grihabor thank you for your proactive participation! And the PR to Airflow in particular 🔥

For my better understanding, please confirm:

But does not it mean that because BaseSQLOperator.__init__ contains super().__init__(…) call, it should also call the next-in-the-MRO class which is BranchSQLOperator?

Or does new_cls.__init__ = cls._apply_defaults(new_cls.__init__) assignment preserves the __init__ method to be class-bound (bound to BaseSQLOperator) so that its super() call resolves into BaseSQLOperator's parent which is BaseOperator?

Just trying to gather some keywords for me to know which concepts I have to refresh in memory 😅

bryzgaloff commented 3 months ago

I agree that a quick fix would be adding the __init__ method simply calling super().__init__(**kwargs) to the ClickHouseBaseDbApiOperator class.

grihabor commented 3 months ago

When ClickHouseBaseDbApiOperator is created, its BaseOperatorMeta metaclass (inherited from BaseSQLOperator) assigns its init in BaseOperatorMeta.new to new_cls.init which is, based on MRO, is BaseSQLOperator.init at the moment of creation.

Correct.

When BranchSQLOperator.init is executed, the first (in the MRO) class with an actual implementation is ClickHouseBaseDbApiOperator. It means that it executes BaseSQLOperator.init as if it was ClickHouseBaseDbApiOperator.init.

The thing is, when you create an instance of the ClickHouseBranchSQLOperator it's __init__ gets called, which is BaseSQLOperator.__init__, so BranchSQLOperator.__init__ is never called. In turn this results in follow_task_ids_if_true and follow_task_ids_if_false not being consumed by BranchSQLOperator.__init__ and eventually leads to airflow exception.

But does not it mean that because BaseSQLOperator.init contains super().init(…) call, it should also call the next-in-the-MRO class which is BranchSQLOperator?

Nope. It calls the next-in-the-MRO class after BaseSQLOperator which is BaseOperator. Here is a small example:

class InitMeta(type):
    def __new__(cls, name, bases, namespace, **kwargs):
        new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
        new_cls.__init__ = new_cls.__init__
        return new_cls

class A(metaclass=InitMeta):
    def __init__(self):
        print("A", super())
        super().__init__()

class B(A):
    pass

class C(A):
    def __init__(self):
        print("C", super())
        super().__init__()

class D(B, C):
    pass

D()

The output is

A <super: <class 'A'>, <D object>>

So in the class A the super call is equivalent to super(A, self).__init__()

Or does new_cls.init = cls._apply_defaults(new_cls.init) assignment preserves the init method to be class-bound (bound to BaseSQLOperator) so that its super() call resolves into BaseSQLOperator's parent which is BaseOperator?

Yep.

bryzgaloff commented 3 months ago

Thank you @grihabor for the explanation and your contribution to Airflow main repo!

@cra I believe you may expect the fix in Airflow 2.9.4

cra commented 3 months ago

Could you add a warning or mark supported versions in the table then? In the README https://github.com/bryzgaloff/airflow-clickhouse-plugin?tab=readme-ov-file#python-and-airflow-versions-support

bryzgaloff commented 3 months ago

Mentioned in README ✅ Also, you are added to the contributors list, thanks! 🤝