apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.12k stars 14.3k forks source link

DatabricksSQLOperator returns string instead of a tuple #39891

Closed aru-trackunit closed 5 months ago

aru-trackunit commented 5 months ago

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

Providers

Package Name Version Description
apache-airflow-providers-amazon 8.21.0 Amazon integration (including Amazon Web Services (AWS)).
apache-airflow-providers-cncf-kubernetes 7.13.0 Kubernetes
apache-airflow-providers-common-io 1.3.0 Common IO Provider
apache-airflow-providers-common-sql 1.11.1 Common SQL Provider
apache-airflow-providers-databricks 6.4.0 Databricks
apache-airflow-providers-ftp 3.7.0 File Transfer Protocol (FTP)
apache-airflow-providers-github 2.6.0 GitHub
apache-airflow-providers-google 10.17.0 Google services including: - Google Ads - Google Cloud (GCP) - Google Firebase - Google LevelDB - Google Marketing Platform - Google Workspace (formerly Google Suite)
apache-airflow-providers-hashicorp 3.6.4 Hashicorp including Hashicorp Vault
apache-airflow-providers-http 4.10.0 Hypertext Transfer Protocol (HTTP)
apache-airflow-providers-imap 3.5.0 Internet Message Access Protocol (IMAP)
apache-airflow-providers-mysql 5.5.4 MySQL
apache-airflow-providers-postgres 5.10.2 PostgreSQL
apache-airflow-providers-sftp 4.9.1 SSH File Transfer Protocol (SFTP)
apache-airflow-providers-slack 8.7.0 Slack services integration including: - Slack API - Slack Incoming Webhook
apache-airflow-providers-smtp 1.6.1 Simple Mail Transfer Protocol (SMTP)
apache-airflow-providers-snowflake 5.4.0 Snowflake
apache-airflow-providers-sqlite 3.7.1 SQLite
apache-airflow-providers-ssh 3.11.0 Secure Shell (SSH)

Providers Package Name Version Description apache-airflow-providers-amazon 8.21.0 Amazon integration (including Amazon Web Services (AWS)). apache-airflow-providers-cncf-kubernetes 7.13.0 Kubernetes apache-airflow-providers-common-io 1.3.0 Common IO Provider apache-airflow-providers-common-sql 1.11.1 Common SQL Provider apache-airflow-providers-databricks 6.4.0 Databricks apache-airflow-providers-ftp 3.7.0 File Transfer Protocol (FTP) apache-airflow-providers-github 2.6.0 GitHub apache-airflow-providers-google 10.17.0 Google services including: - Google Ads - Google Cloud (GCP) - Google Firebase - Google LevelDB - Google Marketing Platform - Google Workspace (formerly Google Suite) apache-airflow-providers-hashicorp 3.6.4 Hashicorp including Hashicorp Vault apache-airflow-providers-http 4.10.0 Hypertext Transfer Protocol (HTTP) apache-airflow-providers-imap 3.5.0 Internet Message Access Protocol (IMAP) apache-airflow-providers-mysql 5.5.4 MySQL apache-airflow-providers-postgres 5.10.2 PostgreSQL apache-airflow-providers-sftp 4.9.1 SSH File Transfer Protocol (SFTP) apache-airflow-providers-slack 8.7.0 Slack services integration including: - Slack API - Slack Incoming Webhook apache-airflow-providers-smtp 1.6.1 Simple Mail Transfer Protocol (SMTP) apache-airflow-providers-snowflake 5.4.0 Snowflake apache-airflow-providers-sqlite 3.7.1 SQLite apache-airflow-providers-ssh 3.11.0 Secure Shell (SSH)

Apache Airflow version

2.8.4

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

After DatabrickSQLOperator finishes execution it returns hard to parse object in XCOM.

(
    [
        '(catalog_name,string,None,None,None,None,None)',
        '(schema_name,string,None,None,None,None,None)'
    ],
    [
        '(prod,schema_1)', 
        '(prod,schema_2)'
    ]
)

Why is it hard? When I try to load it into pandas then instead of having two columns then entire row is loaded into one column

import pandas as pd

test_data = (['(catalog_name,string,None,None,None,None,None)','(schema_name,string,None,None,None,None,None)'],['(prod,schema_1)', '(prod,schema_2)'])

test_df = pd.DataFrame(data=test_data[1])
Output test_df: 0
0 (prod,schema_1)
1 (prod,schema_2)

Same issue applies to columns

What you think should happen instead

First question is whether is it a bug or a feature?

IMO quotes wrapping row should be deleted and added to appropriate string values as below:

(
    [
        ('catalog_name','string',None,None,None,None,None),
        ('schema_name','string',None,None,None,None,None)
    ],
    [
        ('prod','schema_1'), 
        ('prod','schema_2')
    ]
)

Executing the same code as before we get properly configured DataFrame.

import pandas as pd

test_data = ([('catalog_name','string',None,None,None,None,None),('schema_name','string',None,None,None,None,None)],[('prod','schema_1'),('prod','schema_2')])

test_df = pd.DataFrame(data=test_data[1])
Output test_df: 0 1
0 prod schema_1
1 prod schema_2

I haven't spotted exact place in the code where the error occurs, does the error happen only on DatabrickSQLOperator or is it a wider SQLOperator behaviour?

How to reproduce

validate_prod_schema_privileges = DatabricksSqlOperator(
        task_id="validate_prod_schema_privileges",
        dag_default_args={},
        databricks_conn_id="conn-id",
        sql_endpoint_name="endpoint_name",
        sql="SELECT DISTINCT table_catalog as catalog_name, table_schema as schema_name FROM prod.information_schema.tables"
    )

Anything else

No response

Are you willing to submit PR?

Code of Conduct

aru-trackunit commented 5 months ago

My mistake - it is all working well