apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.06k stars 14.29k forks source link

XCom unable to parse tuple response from DatabricksSQLOperator on SQL query execution #39448

Open KanikaAdik opened 6 months ago

KanikaAdik commented 6 months ago

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==6.3.0

Apache Airflow version

2.7.3

Operating System

(airflow)cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" (airflow)

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

I am trying to execute a few queries using DatabricksSQLOperator in an airflow dag. Seems like the output received from the Operator cannot be handled by Xcom. Hence receiving error log - [2024-05-06T20:17:44.253+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your config or make sure to decorate your object with attr. [2024-05-06T20:17:44.254+0000] {taskinstance.py:1937} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default return serialize(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize return encode(classname, version, serialize(data, depth + 1)) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize raise TypeError(f"cannot serialize object of type {cls}") TypeError: cannot serialize object of type <class '.providers.databricks.hooks.databricks_sql.Row'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, *kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push XCom.set( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set value = cls.serialize_value( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value return json.dumps(value, cls=XComEncoder).encode("UTF-8") File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps return cls( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode o = self.default(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default return super().default(o) File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' TypeError: Object of type tuple is not JSON serializable [2024-05-06T20:17:44.259+0000] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=ct_sql_xcom, task_id=select_data, execution_date=20240506T170918, start_date=20240506T201741, end_date=20240506T201744 [2024-05-06T20:17:44.266+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 238 for task select_data (Object of type tuple is not JSON serializable; 1850)

What you think should happen instead

Xcom should be able to handle the JSON serialization or have set a standard with DatabricksSQL providers on acceptable response type to handle any generic case.

How to reproduce

  1. create a DatabricksSQLOperator airflow dag
  2. set the do_xcom_push=True
  3. set a separate task to parse and use values sql query result

`from datetime import datetime from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator from airflow.models import Variable

env = Variable.get('AIRFLOW_VAR_ENV_NAME')

with DAG('ct_sql_xcom', start_date = datetime(2024, 1, 30), schedule_interval = None ) as dag:

create_file = DatabricksSqlOperator(
    databricks_conn_id='databricks',
    sql_endpoint_name='Serverless',
    task_id="create_and_populate_from_file",
    sql="select table_name from system.information_schema.tables where table_catalog = 'rsg_prod'",
    do_xcom_push=True,
)

def downstream_task(**kwargs):
    result = kwargs['task_instance'].xcom_pull(task_ids='create_file')
    print("Received result from XCom:", result)

create_file >> downstream_task `

Anything else

`[2024-05-06T19:13:57.614+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your config or make sure to decorate your object with attr. [2024-05-06T19:13:57.615+0000] {taskinstance.py:1937} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default return serialize(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize return encode(classname, version, serialize(data, depth + 1)) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in return [serialize(d, depth + 1) for d in o] File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize raise TypeError(f"cannot serialize object of type {cls}") TypeError: cannot serialize object of type <class '.providers.databricks.hooks.databricks_sql.Row'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(*args, *kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push XCom.set( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper return func(args, **kwargs) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set value = cls.serialize_value( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value return json.dumps(value, cls=XComEncoder).encode("UTF-8") File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps return cls( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode o = self.default(o) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default return super().default(o) File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' TypeError: Object of type tuple is not JSON serializable [2024-05-06T19:13:57.620+0000] {taskinstance.py:1400} INFO - Marking task as UP_FOR_RETRY. dag_id=ct_sql_xcom, task_id=create_file , execution_date=20240506T170918, start_date=20240506T191355, end_date=20240506T191357 [2024-05-06T19:13:57.627+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 233 for task create_file (Object of type tuple is not JSON serializable; 842)`

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 6 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

iopiopi1 commented 4 months ago

I'd like to take this into work, can you please assign to me?

jroachgolf84 commented 4 months ago

If this isn't assigned, I'd love to have it assigned to myself.

avivshafir commented 2 months ago

Any update on this @jroachgolf84 ? I'm also getting a similar error

_pickle.PicklingError: Can't pickle <class 'airflow.providers.databricks.hooks.databricks_sql.Row'>: attribute lookup Row on airflow.providers.databricks.hooks.databricks_sql failed

using apache-airflow-providers-databricks==6.9.0