apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

SQLExecuteQueryOperator AttributeError exception when returning result to XCom #31080

Closed Stormhand closed 1 year ago

Stormhand commented 1 year ago

Apache Airflow version

2.6.0

What happened

I am using DatabricksSqlOperator which writes the result to a file. When the task finishes it writes all the data correctly to the file the throws the following exception:

[2023-05-05, 07:56:22 UTC] {taskinstance.py:1847} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 73, in wrapper return func(*args, *kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2377, in xcom_push XCom.set( File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 73, in wrapper return func(args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 237, in set value = cls.serialize_value( File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 632, in serialize_value return json.dumps(value, cls=XComEncoder).encode("UTF-8") File "/usr/local/lib/python3.9/json/init.py", line 234, in dumps return cls( File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/json.py", line 102, in encode o = self.default(o) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/json.py", line 91, in default return serialize(o) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 144, in serialize return encode(classname, version, serialize(data, depth + 1)) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 132, in serialize qn = qualname(o) File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/module_loading.py", line 47, in qualname return f"{o.module}.{o.name}" File "/home/airflow/.local/lib/python3.9/site-packages/databricks/sql/types.py", line 161, in getattr raise AttributeError(item) AttributeError: name

I found that SQLExecuteQueryOperator always return the result(so pushing XCom) from its execute() method except when the parameter do_xcom_push is set to False. But if do_xcom_push is False then the method _process_output() is not executed and DatabricksSqlOperator wont write the results to a file.

What you think should happen instead

I am not sure if the problem should be fixed in DatabricksSqlOperator or in SQLExecuteQueryOperator. In any case setting do_xcom_push shouldn't automatically prevent the exevution of _process_output():

        if not self.do_xcom_push:
            return None
        if return_single_query_results(self.sql, self.return_last, self.split_statements):
            # For simplicity, we pass always list as input to _process_output, regardless if
            # single query results are going to be returned, and we return the first element
            # of the list in this case from the (always) list returned by _process_output
            return self._process_output([output], hook.descriptions)[-1]
        return self._process_output(output, hook.descriptions)

What happens now is - i have in the same time big result in a file AND in the XCom.

How to reproduce

I suspect that the actual Exception is related to writing the XCom to the meta database and it might not fail on other scenarios.

Operating System

Debian GNU/Linux 11 (bullseye) docker image

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.0.0 apache-airflow-providers-apache-spark==4.0.1 apache-airflow-providers-celery==3.1.0 apache-airflow-providers-cncf-kubernetes==6.1.0 apache-airflow-providers-common-sql==1.4.0 apache-airflow-providers-databricks==4.1.0 apache-airflow-providers-docker==3.6.0 apache-airflow-providers-elasticsearch==4.4.0 apache-airflow-providers-ftp==3.3.1 apache-airflow-providers-google==10.0.0 apache-airflow-providers-grpc==3.1.0 apache-airflow-providers-hashicorp==3.3.1 apache-airflow-providers-http==4.3.0 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-microsoft-azure==6.0.0 apache-airflow-providers-microsoft-mssql==3.3.2 apache-airflow-providers-mysql==5.0.0 apache-airflow-providers-odbc==3.2.1 apache-airflow-providers-oracle==3.6.0 apache-airflow-providers-postgres==5.4.0 apache-airflow-providers-redis==3.1.0 apache-airflow-providers-samba==4.1.0 apache-airflow-providers-sendgrid==3.1.0 apache-airflow-providers-sftp==4.2.4 apache-airflow-providers-slack==7.2.0 apache-airflow-providers-snowflake==4.0.5 apache-airflow-providers-sqlite==3.3.2 apache-airflow-providers-ssh==3.6.0 apache-airflow-providers-telegram==4.0.0

Deployment

Docker-Compose

Deployment details

Using extended Airflow image, LocalExecutor, Postgres 13 meta db as container in the same stack. docker-compose version 1.29.2, build 5becea4c Docker version 23.0.5, build bc4487a

Anything else

No response

Are you willing to submit PR?

Code of Conduct

potiuk commented 1 year ago

Interesting one. Proposed a fix in #31136 @Stormhand

Stormhand commented 1 year ago

Hi @potiuk , unfortunately it happened again. This time i need do_xcom_push:

[2023-06-06, 08:52:24 UTC] {sql.py:375} INFO - Running statement: SELECT cast(max(id) as STRING) FROM prod.unified.sessions, parameters: None
[2023-06-06, 08:52:25 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2354, in xcom_push
    XCom.set(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 237, in set
    value = cls.serialize_value(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 632, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 144, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 132, in serialize
    qn = qualname(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 47, in qualname
    return f"{o.__module__}.{o.__name__}"
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/types.py", line 161, in __getattr__
    raise AttributeError(item)
AttributeError: __name__. Did you mean: '__ne__'?

This is how i use it:

    get_max_id_task = DatabricksSqlOperator(
        databricks_conn_id=databricks_sql_conn_id,
        sql_endpoint_name='sql_endpoint',
        task_id='get_max_id',
        sql="SELECT cast(max(id) as STRING) FROM prod.unified.sessions",
        do_xcom_push=True
    )

Databricks providers, i use the latest:

apache-airflow-providers-common-sql==1.5.1
databricks-sql-connector==2.5.2
apache-airflow-providers-databricks==4.2.0

Airflow is 2.6.1/python 3.10

UPDATE: Replacing the Databricks SQL Operator with simple PythonOperator and Databricks Sql Hook works just fine:

def get_max_id(ti):
    hook = DatabricksSqlHook(databricks_conn_id=databricks_sql_conn_id, sql_endpoint_name='sql_endpoint')
    sql = "SELECT cast(max(id) as STRING) FROM prod.unified.sessions"
    return str(hook.get_first(sql)[0])
potiuk commented 1 year ago

Can you please open a new issue please @Stormhand - this is quite bit different issue with similar stacktrace (cc: @alexott - maybe you can take a look and implement a fix for that one.

Stormhand commented 1 year ago

Can you please open a new issue please @Stormhand - this is quite bit different issue with similar stacktrace (cc: @alexott - maybe you can take a look and implement a fix for that one.

Thank you. Opened a new one #31753