astronomer / astronomer-providers

Airflow Providers containing Deferrable Operators & Sensors from Astronomer
https://astronomer-providers.rtfd.io/
Apache License 2.0
128 stars 25 forks source link

SnowflakeOperatorAsync calls super with incorrect `conn_id` argument #1545

Closed tzachz closed 1 week ago

tzachz commented 3 weeks ago

Describe the bug It seems that there are inconsistencies between SnowflakeOperatorAsync and its super class, SnowflakeOperator. The SnowflakeOperator expects a snowflake_conn_id argument (see here), but when this constructor is called from SnowflakeOperatorAsync constructor, it's called with conn_id instead (here):

super().__init__(conn_id=snowflake_conn_id, **kwargs)

This practically means that the conn_id argument gets bundled into kwargs within SnowflakeOperator.__init__, but then SnowflakeOperator calls its parent's __init__ method (SQLExecuteQueryOperator.__init__) with an explicit conn_id= argument - in addition to passing along the kwargs. This results in the following DAG import error:

SQLExecuteQueryOperator.__init__() got multiple values for keyword argument 'conn_id'

I'm using versions:

To Reproduce Create the following simple operator:

class MySnowflakeOperator(SnowflakeOperatorAsync):
    def __init__(self,
                 snowflake_conn_id: str,
                 sql: str,
                 **kwargs):
        super().__init__(sql=sql, snowflake_conn_id=snowflake_conn_id, **kwargs)

Use it in a DAG:

with DAG(
    dag_id=f"dv-dummy-pipeline",
    schedule_interval=None
) as dag:

    task = MySnowflakeOperator(
        task_id='t1',
        sql="select 1",
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        dag=dag
    )

    other_task = EmptyOperator(task_id="t2")

task >> other_task

Deploy this DAG.

Expected behavior DAG is imported successfully

Screenshots Instead, DAG cannot be imported: image

Desktop (please complete the following information):

Potential fix The fix seems to be trivial - replace the conn_id=snowflake_conn_id with snowflake_conn_id=snowflake_conn_id when calling SnowflakeOperator's __init__ method from SnowflakeOperatorAsync.__init__.

phanikumv commented 3 weeks ago

Thank you @tzachz for creating the issue, we will look into it and fix it soon in upcoming release

pankajastro commented 3 weeks ago

Unfortunately, SnowflakeOperatorAsync currently extends SnowflakeOperator, which has been deprecated. We're using conn_id in the superclass to initialize the connection parameter in SQLExecuteQueryOperator. Based on the example you have shared, it seems you're attempting to extend SnowflakeOperatorAsync to create a new custom operator and not directly using it. If you're planning to create the operator MySnowflakeOperator, I recommend considering SQLExecuteQueryOperator as the appropriate class to extend, rather than SnowflakeOperatorAsync.

phanikumv commented 1 week ago

Closing as per @pankajastro 's suggestion. Please re-open if required