databricks / dbt-databricks

A dbt adapter for Databricks.
https://databricks.com
Apache License 2.0
195 stars 104 forks source link

QUALIFY clause does not work with incremental models #84

Closed binhnefits closed 2 years ago

binhnefits commented 2 years ago

Describe the bug

When using QUALIFY in an incremental model, the view dbt creates does not exist. This prevents dbt from doing the merge statement.

Steps To Reproduce

test.sql

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'merge',
    unique_key = 'id',
) }}

with test as (
  select 1 as id
)
select *
from test
qualify row_number() over (order BY id) = 1

log results:

    create temporary view test__dbt_tmp as

with test as (
  select 1 as id
)
select *
from test
qualify row_number() over (order BY id) = 1

17:50:20.156423 [debug] [Thread-1  ]: Opening a new connection, currently in state closed
17:50:21.114806 [debug] [Thread-1  ]: SQL status: OK in 0.96 seconds
17:50:22.030437 [debug] [Thread-1  ]: Writing runtime SQL for node "model.lakehouse.test"
17:50:22.031221 [debug] [Thread-1  ]: Spark adapter: NotImplemented: add_begin_query
17:50:22.031552 [debug] [Thread-1  ]: Using databricks connection "model.lakehouse.test"
17:50:22.031835 [debug] [Thread-1  ]: On model.lakehouse.test: /* {"app": "dbt", "dbt_version": "1.0.4", "profile_name": "limebi", "target_name": "stg", "node_id": "model.lakehouse.test"} */

    merge into silver_limeade_platform.test as DBT_INTERNAL_DEST
      using test__dbt_tmp as DBT_INTERNAL_SOURCE

        on 
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id

      when matched then update set *
      when not matched then insert *

17:50:22.578607 [debug] [Thread-1  ]: Databricks adapter: Error while running:
/* {"app": "dbt", "dbt_version": "1.0.4", "profile_name": "limebi", "target_name": "stg", "node_id": "model.lakehouse.test"} */

    merge into silver_limeade_platform.test as DBT_INTERNAL_DEST
      using test__dbt_tmp as DBT_INTERNAL_SOURCE

        on 
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id

      when matched then update set *
      when not matched then insert *

17:50:22.579077 [debug] [Thread-1  ]: Databricks adapter: Query execution failed. State: ERROR_STATE; Error code: 0; SQLSTATE: None; Error message: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: Table or view not found: test__dbt_tmp; line 9 pos 12
    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:47)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:418)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:245)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties(ThriftLocalProperties.scala:123)
    at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties$(ThriftLocalProperties.scala:48)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:52)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:223)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:208)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:257)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.AnalysisException: Table or view not found: test__dbt_tmp; line 9 pos 12
    at org.apache.spark.sql.AnalysisException.copy(AnalysisException.scala:71)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:410)
    ... 16 more

Expected behavior

Expect view to be created and queryable to execute merge statement.

Screenshots and log output

If applicable, add screenshots or log output to help explain your problem.

System information

The output of dbt --version: 1.04 The operating system you're using: WSL The output of python --version: Python 3.9.11

Additional context

Add any other context about the problem here.

ueshin commented 2 years ago

Hi @binhnefits, thanks for the report!

I tried the query in my local, but I couldn't reproduce it.

Could you share more details about your environment?

Thanks.

binhnefits commented 2 years ago

Hi @ueshin, thank you for looking into this issue.

I'm using dbt-databricks 1.0.2

Here is a screenshot of the SQL endpoint I'm using image

Here is manifest.json configuration

    "model.lakehouse.test": {
      "raw_sql": "{{ config(\n    materialized = 'incremental',\n    incremental_strategy = 'merge',\n    unique_key = 'id',\n) }}\n\nwith test as (\n  select 1 as id\n)\nselect *\nfrom test\nqualify row_number() over (order BY id) = 1",
      "compiled": true,
      "resource_type": "model",
      "depends_on": {
        "macros": [
          "macro.dbt_spark.dbt_spark_validate_get_file_format",
          "macro.dbt_spark.dbt_spark_validate_get_incremental_strategy",
          "macro.dbt.should_full_refresh",
          "macro.dbt.incremental_validate_on_schema_change",
          "macro.dbt.load_relation",
          "macro.dbt.make_temp_relation",
          "macro.dbt.run_hooks",
          "macro.dbt.create_table_as",
          "macro.dbt.run_query",
          "macro.dbt.process_schema_changes",
          "macro.dbt_databricks.dbt_databricks_get_incremental_sql",
          "macro.dbt.statement"
        ],
        "nodes": []
      },
      "config": {
        "enabled": true,
        "alias": null,
        "schema": "silver_limeade_platform",
        "database": null,
        "tags": ["silver", "limeade_platform"],
        "meta": {},
        "materialized": "incremental",
        "persist_docs": { "relation": true, "columns": true },
        "quoting": {},
        "column_types": {},
        "full_refresh": null,
        "on_schema_change": "ignore",
        "file_format": "delta",
        "location_root": "/mnt/datalake/silver/limeade_platform",
        "incremental_strategy": "merge",
        "unique_key": "id",
        "post-hook": [
          { "sql": "VACUUM {{ this }}", "transaction": true, "index": null }
        ],
        "pre-hook": []
      },
      "database": null,
      "schema": "silver_limeade_platform",
      "fqn": ["lakehouse", "silver", "limeade_platform", "test"],
      "unique_id": "model.lakehouse.test",
      "package_name": "lakehouse",
      "root_path": "/home/binhpham/work/BI/lakehouse-dbt",
      "path": "silver/limeade_platform/test.sql",
      "original_file_path": "models/silver/limeade_platform/test.sql",
      "name": "test",
      "alias": "test",
      "checksum": {
        "name": "sha256",
        "checksum": "065d1ecb756a3c4b3fda53e0a514482de13e62610d0bc489c4b843712845fdc0"
      },
      "tags": ["silver", "limeade_platform"],
      "refs": [],
      "sources": [],
      "description": "",
      "columns": {},
      "meta": {},
      "docs": { "show": true },
      "patch_path": null,
      "compiled_path": "target/compiled/lakehouse/models/silver/limeade_platform/test.sql",
      "build_path": "target/run/lakehouse/models/silver/limeade_platform/test.sql",
      "deferred": false,
      "unrendered_config": {
        "persist_docs": { "relation": true, "columns": true },
        "file_format": "delta",
        "post-hook": "VACUUM {{ this }}",
        "tags": ["limeade_platform"],
        "location_root": "/mnt/datalake/silver/limeade_platform",
        "schema": "silver_limeade_platform",
        "materialized": "incremental",
        "incremental_strategy": "merge",
        "unique_key": "id"
      },
      "created_at": 1650476991.1954176,
      "compiled_sql": "\n\nwith test as (\n  select 1 as id\n)\nselect *\nfrom test\nqualify row_number() over (order BY id) = 1",
      "extra_ctes_injected": true,
      "extra_ctes": [],
      "relation_name": "silver_limeade_platform.test"
    }
binhnefits commented 2 years ago

Also its worth mentioning that this works on clusters but not sql endpoints in my databricks environment.

kmarq commented 2 years ago

I've been noticing this issue as well. I had seen that it results in the temp view being created but then on the next step it isn't found to select from. We noticed it was happening for some models but not others, but hadn't identified that the qualify statement seems to be the culprit. The models that fail are using it while the ones that work are not. I also observed the failed models will run on a cluster but not a SQL endpoint. Can't get any logs right now but if you need some additional input I can get them.

allisonwang-db commented 2 years ago

Hi @binhnefits, does it work if you remove the qualify clause?

Also its worth mentioning that this works on clusters but not sql endpoints in my databricks environment.

It would be helpful if you can click the name of the SQL endpoint you created and provide a screenshot of the SQL endpoint's configuration (the "Overview" tab).

binhnefits commented 2 years ago

@allisonwang-db

Yes this works without qualify, and using row_number with cte.

Here is SQL endpoint overview image

allisonwang-db commented 2 years ago

@binhnefits Thanks for providing the info. This is a known issue with SQL endpoints and the team is actively working on fixing it (likely by the end of the month). Will give an update once the fix is rolled out to production.

binhnefits commented 2 years ago

Hi @allisonwang-db I was wondering if you had any updates. I noticed Datbricks released runtime 10.5 so wondering if that had the fix in it. Thanks!

allisonwang-db commented 2 years ago

@binhnefits the issue should be fixed! Please try it again and let me know if it is still not working.

superdupershant commented 2 years ago

hi @binhnefits any luck testing this out again? Does it work now?

binhnefits commented 2 years ago

Hi @allisonwang-db @superdupershant I have confirmed that it is working now. Thank you!