apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.31k forks source link

AttributeError exception when returning result to XCom #31753

Closed Stormhand closed 1 year ago

Stormhand commented 1 year ago

Apache Airflow version

2.6.1

What happened

When i use _do_xcompush=True in DatabricksSqlOperator the an exception with following stack trace is thrown:

[2023-06-06, 08:52:24 UTC] {sql.py:375} INFO - Running statement: SELECT cast(max(id) as STRING) FROM prod.unified.sessions, parameters: None
[2023-06-06, 08:52:25 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2354, in xcom_push
    XCom.set(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 237, in set
    value = cls.serialize_value(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 632, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 144, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 132, in serialize
    qn = qualname(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 47, in qualname
    return f"{o.__module__}.{o.__name__}"
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/types.py", line 161, in __getattr__
    raise AttributeError(item)
AttributeError: __name__. Did you mean: '__ne__'?

What you think should happen instead

In _process_output() if self._output_path is False a list of tuples is returned:

 def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequence] | None]) -> list[Any]:
    if not self._output_path:
        return list(zip(descriptions, results))

I suspect this breaks the serialization somehow which might be related to my own meta database(postgres).

Replacing the Databricks SQL Operator with simple PythonOperator and DatabricksSqlHook works just fine:

def get_max_id(ti):
    hook = DatabricksSqlHook(databricks_conn_id=databricks_sql_conn_id, sql_endpoint_name='sql_endpoint')
    sql = "SELECT cast(max(id) as STRING) FROM prod.unified.sessions"
    return str(hook.get_first(sql)[0])

How to reproduce

    get_max_id_task = DatabricksSqlOperator(
        databricks_conn_id=databricks_sql_conn_id,
        sql_endpoint_name='sql_endpoint',
        task_id='get_max_id',
        sql="SELECT cast(max(id) as STRING) FROM prod.unified.sessions",
        do_xcom_push=True
    )

Operating System

Debian GNU/Linux 11 (bullseye) docker image, python 3.10

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.5.1 databricks-sql-connector==2.5.2 apache-airflow-providers-databricks==4.2.0

Deployment

Docker-Compose

Deployment details

Using extended Airflow image, LocalExecutor, Postgres 13 meta db as container in the same stack. docker-compose version 1.29.2, build 5becea4c Docker version 23.0.5, build bc4487a

Anything else

No response

Are you willing to submit PR?

Code of Conduct

phanikumv commented 1 year ago

@Stormhand can you please mention what is the value that you are trying to cast ? Basically I need the value of max(id) which you are tryting to cast to STRING so that I can try to reproduce this on my end?

Stormhand commented 1 year ago

@Stormhand can you please mention what is the value that you are trying to cast ? Basically I need the value of max(id) which you are tryting to cast to STRING so that I can try to reproduce this on my end?

Its 32bit integer. Then i tried to cast it as string but didnt work.

BTW. It started failing after i updated to 2.6.1 from 2.6.0

phanikumv commented 1 year ago

Yes @Stormhand I was able to reproduce this issue. I am getting this error on 2.6.1 with the below DAG

cc @jedcunningham @potiuk

from datetime import datetime

from airflow import DAG
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator

with DAG(
    dag_id="example_databricks_sql",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    tags=["example", "databricks"],
    catchup=False,
) as submit_dag:
    get_max_id_task = DatabricksSqlOperator(
        databricks_conn_id="databricks_default",
        sql_endpoint_name='astro-sdk-test',
        task_id='get_max_id',
        sql="SELECT cast(max(col2) as STRING) FROM test_table2",
        do_xcom_push=True
    )
*** Found local files:
***   * /usr/local/airflow/logs/dag_id=example_databricks_sql/run_id=manual__2023-06-07T13:08:10.011124+00:00/task_id=get_max_id/attempt=1.log
[2023-06-07, 13:08:10 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_databricks_sql.get_max_id manual__2023-06-07T13:08:10.011124+00:00 [queued]>
[2023-06-07, 13:08:10 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_databricks_sql.get_max_id manual__2023-06-07T13:08:10.011124+00:00 [queued]>
[2023-06-07, 13:08:10 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-07, 13:08:10 UTC] {taskinstance.py:1327} INFO - Executing <Task(DatabricksSqlOperator): get_max_id> on 2023-06-07 13:08:10.011124+00:00
[2023-06-07, 13:08:10 UTC] {standard_task_runner.py:57} INFO - Started process 852 to run task
[2023-06-07, 13:08:10 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'example_databricks_sql', 'get_max_id', 'manual__2023-06-07T13:08:10.011124+00:00', '--job-id', '7', '--raw', '--subdir', 'DAGS_FOLDER/example_databricks_sql.py', '--cfg-path', '/tmp/tmpiivo9oy_']
[2023-06-07, 13:08:10 UTC] {standard_task_runner.py:85} INFO - Job 7: Subtask get_max_id
[2023-06-07, 13:08:10 UTC] {task_command.py:410} INFO - Running <TaskInstance: example_databricks_sql.get_max_id manual__2023-06-07T13:08:10.011124+00:00 [running]> on host 7bc0a25b3ba8
[2023-06-07, 13:08:10 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_databricks_sql' AIRFLOW_CTX_TASK_ID='get_max_id' AIRFLOW_CTX_EXECUTION_DATE='2023-06-07T13:08:10.011124+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-07T13:08:10.011124+00:00'
[2023-06-07, 13:08:10 UTC] {sql.py:265} INFO - Executing: SELECT cast(max(col2) as STRING) FROM test_table2
[2023-06-07, 13:08:10 UTC] {base.py:73} INFO - Using connection ID 'databricks_default' for task execution.
[2023-06-07, 13:08:11 UTC] {databricks_base.py:424} INFO - Using token auth. For security reasons, please set token in Password field instead of extra
[2023-06-07, 13:08:11 UTC] {databricks_base.py:424} INFO - Using token auth. For security reasons, please set token in Password field instead of extra
[2023-06-07, 13:08:13 UTC] {client.py:193} INFO - Successfully opened session b'\x01\xee\x054[\xb4\x19i\x9f:m\xcd];[W'
[2023-06-07, 13:08:13 UTC] {sql.py:375} INFO - Running statement: SELECT cast(max(col2) as STRING) FROM test_table2, parameters: None
[2023-06-07, 13:08:15 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2354, in xcom_push
    XCom.set(
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/xcom.py", line 237, in set
    value = cls.serialize_value(
  File "/usr/local/lib/python3.10/site-packages/airflow/models/xcom.py", line 632, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 144, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 132, in serialize
    qn = qualname(o)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 47, in qualname
    return f"{o.__module__}.{o.__name__}"
  File "/usr/local/lib/python3.10/site-packages/databricks/sql/types.py", line 161, in __getattr__
    raise AttributeError(item)
AttributeError: __name__. Did you mean: '__ne__'?
[2023-06-07, 13:08:15 UTC] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=example_databricks_sql, task_id=get_max_id, execution_date=20230607T130810, start_date=20230607T130810, end_date=20230607T130815
[2023-06-07, 13:08:15 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 7 for task get_max_id (__name__; 852)
potiuk commented 1 year ago

@phanikumv, It looks like databricks returns a description which is both callable and has no __name__ defined and the serde of ours does not handle this well. Since you have the environment ready - Maybe you can inspect and print the actual type of the object returned there ? That would help wit determining a fix.

phanikumv commented 1 year ago

You are correct @potiuk , it is coming in as a Row object, It contains __module__ but not __name__

Below is the output when I did a dir(o)

[2023-06-07, 17:35:48 UTC] {logging_mixin.py:152} INFO - ^^^^^^^^^^^^^^^^^
[2023-06-07, 17:35:48 UTC] {logging_mixin.py:152} INFO - Row(CAST(max(col3) AS STRING)='102.23')
[2023-06-07, 17:35:48 UTC] {logging_mixin.py:152} INFO - ['__add__', '__call__', '__class__', '__contains__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__fields__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getitem__', '__getnewargs__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__module__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__rmul__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'asDict', 'count', 'index']
hussein-awala commented 1 year ago

I think we should close it as duplication of #31499, @potiuk wdyt?

@Stormhand can you try this solution:

Can you try to extend the operator class, and override the run method to convert the output from a list of Row to a list of tuples or dicts?

Stormhand commented 1 year ago

I think we should close it as duplication of #31499, @potiuk wdyt?

@Stormhand can you try this solution:

Can you try to extend the operator class, and override the run method to convert the output from a list of Row to a list of tuples or dicts?

What's the point? Using the hook as in the original post is way shorter and easier.

Edit: or you mean just for the test?

hussein-awala commented 1 year ago

What's the point? Using the hook as in the original post is way shorter and easier. Edit: or you mean just for the test?

Here is the full comment:

I believe it's a specific issue with the Databricks provider, where the Row class does not allow access to private attributes and methods. You can see the relevant code here. However, the new serializer introduced in version 2.6.0 (refactored in https://github.com/apache/airflow/pull/28067) requires access to the method name in order to serialize the object. Can you try to extend the operator class, and override the run method to convert the output from a list of Row to a list of tuples or dicts?

If we cannot serialize the returned objects from the Databricks cursor because of a limitation in the databricks lib, we need to find a workaround for Databricks provider and not change our serialization strategy, for that I suggest this solution before opening a PR. So it is not just for testing, if you test it and you find that it fixes the issue, you can open a PR to fix it in the Airflow Databricks provider.

potiuk commented 1 year ago

Yes we should not change our strategy, and yes we should make a change in the provider, however I think we can also improve the error message in this case (PR here - just worked on it https://github.com/apache/airflow/pull/31778)

potiuk commented 1 year ago

I think https://github.com/apache/airflow/pull/31780 hould fix it @Stormhand @phanikumv - is it possible you test it on your system by patching the code ? That would help to validate if this is ok (it works in unit tests).

phanikumv commented 1 year ago

Yes https://github.com/apache/airflow/pull/31780 fixes the issue,I have mentioned the test results here https://github.com/apache/airflow/pull/31780#pullrequestreview-1469148576