apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.62k stars 14.18k forks source link

`SQLColumnCheckOperator` failures after upgrading to `common-sql==1.3.0` #27976

Closed mag3141592 closed 1 year ago

mag3141592 commented 1 year ago

Apache Airflow Provider(s)

common-sql

Versions of Apache Airflow Providers

apache-airflow-providers-google==8.2.0 apache-airflow-providers-http==4.0.0 apache-airflow-providers-salesforce==5.0.0 apache-airflow-providers-slack==5.1.0 apache-airflow-providers-snowflake==3.2.0

Issue: apache-airflow-providers-common-sql==1.3.0

Apache Airflow version

2.4.3

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Astronomer

Deployment details

No response

What happened

Problem occurred when upgrading from common-sql=1.2.0 to common-sql=1.3.0

Getting a KEY_ERROR when running a unique_check and null_check on a column.

1.3.0 log:

Screen Shot 2022-11-28 at 2 01 20 PM

1.2.0 log:

Screen Shot 2022-11-28 at 2 00 15 PM

What you think should happen instead

Potential causes:

Expected behavior:

How to reproduce

from datetime import datetime
from airflow import DAG

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator

my_conn_id = "snowflake_default"

default_args={"conn_id": my_conn_id}

with DAG(
    dag_id="airflow_providers_example",
    schedule=None,
    start_date=datetime(2022, 11, 27),
    default_args=default_args,
) as dag:

    create_table = SnowflakeOperator(
        task_id="create_table",
        sql=""" CREATE OR REPLACE TABLE testing AS (
                        SELECT
                            1 AS row_num,
                            'not null' AS field

                        UNION ALL

                        SELECT
                            2 AS row_num,
                            'test' AS field

                        UNION ALL

                        SELECT
                            3 AS row_num,
                            'test 2' AS field
                    )""",
    )

    column_checks = SQLColumnCheckOperator(
        task_id="column_checks",
        table="testing",
        column_mapping={
            "field": {"unique_check": {"equal_to": 0}, "null_check": {"equal_to": 0}}
        },
    )

    create_table >> column_checks

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 1 year ago

Thanks for opening your first issue here! Be sure to follow the issue template!

denimalpaca commented 1 year ago

@potiuk seems like there is another dependency issue here. I noticed the DBApiHook changed some behavior where get_all() is now calling run() to make these calls. Not sure if that's the issue or maybe something Snowflake-specific, but the return type of the record has changed.

@mag3141592 the .get("tolerance") is fine as it is, as the value param defaults to None. All previous versions of the operator have been written like this and they work fine!

potiuk commented 1 year ago

Seems also we have the same (or similar) problem in https://github.com/apache/airflow/issues/27978

We might need a 1.3.2 rather quickly I am afraid.

denimalpaca commented 1 year ago

Interesting. Is this at the hook level and something that needs to be changed for the operators? If so, I can open a PR today with the fix for a dict return type.

Still not sure if this is a Snowflake-specific problem or a general SQL hook issue. It doesn't seem like run() is necessarily returning a dict.

potiuk commented 1 year ago

I do not know yet :) . I am travelling most of today and I had no time to look closely as I was releasing 1.3.1 + other providers.. I will see later today but maybe you can see in the meantime what would be the possible reason.

kaxil commented 1 year ago

We had the same issue on the Astro SDK side, which we had to fix by https://github.com/astronomer/astro-sdk/commit/0c64f486be2dd7c0a176ef3b43fed7544844b87d

potiuk commented 1 year ago

We had the same issue on the Astro SDK side, which we had to fix by astronomer/astro-sdk@0c64f48

Thanks @kaxil - I think the root cause of this problem is different. This was the problem of the "unifying" the run() methods behaviour I did in https://github.com/apache/airflow/pull/27912.

Fix for the problem you found in astro is here: https://github.com/apache/airflow/pull/27997

I am still looking for the other problems - they look similar but they were introduced in 1.3.0 so this is not the case.

potiuk commented 1 year ago

OK. @denimalpaca @kazanzhy.

I know why it is happening - it is Snowflake-only thing. And I am not 100% sure yet how to fix it in a good way.

The root cause of the problem is this line in Snowflake Hook's run() method:

 with closing(conn.cursor(DictCursor)) as cur: 

The result of it is that after the #26944 change, get_records() and get_first() changed the result type to return dictionaries and not sequences (previously each of those methods had their own implementations and did not use run() method, so they used "standard" sensor).

Simply speaking: SnowflakeHook's run() method is not standard - instead of sequence of sequences, it returns sequence of dicts. This is absoluately not standard behaviour for DBApI - this is simply creative aproach of snowflake python connector to return Dicts: https://docs.snowflake.com/en/user-guide/python-connector-api.html#cursor

I think the best fix is:

This would be almost identical change to what I've done for Databricks last week (only Databricks internals were different - no DictCursor and it returned Tuples rather than Dicts):

https://airflow.apache.org/docs/apache-airflow-providers-databricks/stable/index.html#breaking-changes

The DatabricksSqlHook is now conforming to the same semantics as all the other DBApiHook implementations and returns the same kind of response in its run method. Previously (pre 4.* versions of the provider, the Hook returned Tuple of (“cursor description”, “results”) which was not compatible with other DBApiHooks that return just “results”. After this change (and dependency on common.sql >= 1.3.1), The DatabricksSqlHook returns now “results” only. The description can be retrieved via last_description field of the hook after run method completes.

That would require Snowflake 5.0.0 breaking change release, and basically making whole 4.0. yanked. Or probably better - we could relase 4.0.2 with adding this breaking change and yanking 4.0.0, 4.0.1 as "wrong". Making 4.0.2 the "real breaking" change we actually wanted (or started to want) to make in 4.0 . 4.0.* is very fresh and has not been released yet in constraints.

The end result will be that we will finally (I hope) standardise the semantics of DBApiHook.run(). So far, each of the hooks could change it (and some did as we see) - this change seems to finally got all the run() methods to behave in the same way.

WDYT Everyone?

potiuk commented 1 year ago

The same fix will work for #27978 - standardising run() methods to behave the same for all operators will also fix #27978, because SqlSensor uses get_records() and gets sequence of dicts instead of sequence of sequences and 0 is missing in the dictionary.

CC: @uranusjr @kaxil I am also summoning you for opinion on that one

potiuk commented 1 year ago

This is how the fix can look like #28006 - so it is not big. If others will think this is a good, idea (@mik-laj - maybe you also can take a look and see if that's ok to add this breaking change) then I will add similar tests as we have in Databricks Hook now https://github.com/apache/airflow/blob/main/tests/providers/databricks/hooks/test_databricks_sql.py

potiuk commented 1 year ago

Those involved - Let me know please if you think this is a good idea to add this breaking change (cc: @eladkal ?)

potiuk commented 1 year ago

I've added full tests coverage in #28006

denimalpaca commented 1 year ago

My vote is towards standardizing Snowflake's hook with other DBApi Hooks, and having 4.0.2 being "the" breaking change for deep usages.

potiuk commented 1 year ago

My vote is towards standardizing Snowflake's hook with other DBApi Hooks, and having 4.0.2 being "the" breaking change for deep usages.

Yeah I think it makes most sense.. This is one-time pain we have to go throug to finally get "common" behaviours and be able to use it in lineage, common.sql operators, transfers and a number of possible future usages.

potiuk commented 1 year ago

And users can still use 3.* for a long time if they want to keep compatibility

potiuk commented 1 year ago

@mag3141592 - can you please check if 4.0.2rc1 solves the problem and report it in https://github.com/apache/airflow/issues/28036