aws-samples / dbt-glue

This repository contains the dbt-glue adapter
Apache License 2.0
96 stars 67 forks source link

Missing catalog name in CTAS #356

Open dev-goyal opened 6 months ago

dev-goyal commented 6 months ago

Describe the bug

We have a simple DBT model as follows:

# model.yaml

version: 2

models:
  - name: matches
    description: RS
    config:
      materialized: 'table'
      incremental_strategy: 'insert_overwrite'
      file_format: iceberg
      glue_version: '4.0'
      glue_session_reuse: True
      iceberg_expire_snapshots: True
      custom_location: '{{target.location}}/matches'
    columns:
      - name: id
        type: varchar
      - name: rating
        type: integer

And the very simple matches.sql model

SELECT * FROM {{ source('matches_source', 'sourcetest') }}

However, it fails with

23:55:06  Completed with 1 error and 0 warnings:
23:55:06
23:55:06    Database Error in model matches (dbt_models/matches/matches.sql)
  Glue cursor returned `error` for statement None for code SqlWrapper2.execute('''/* {"app": "dbt", "dbt_version": "1.7.9", "profile_name": "ml_iceberg", "target_name": "dev", "node_id": "model.ml_dating_outcomes.m
atches"} */

      create table dating_outcomes.matches

      using iceberg

      location 's3://<<path>>/iceberg-tables/dating_outcomes/matches'

        as
        SELECT * FROM ml_features.sourcetest
    ''', use_arrow=False, location='s3://<<path>>/iceberg-tables/dating_outcomes'), Py4JJavaError: An error occurred while calling o81.sql.
  : org.apache.spark.SparkException: Table implementation does not support writes: dating_outcomes.matches
        at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTableWritesError(QueryExecutionErrors.scala:761)                                                                                                      at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:515)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550)
        at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:491)
        at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:486)
        at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:68)
        at org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:92)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
...

I believe this is because the CTE is missing the catalog name (instead of dating_outcomes.matches, it should be glue_catalog.dating_outcomes.matches), and have confirmed the above works when we add it to the macro by overloading it.

Steps To Reproduce

In as much detail as possible, please provide steps to reproduce the issue. Sample data that triggers the issue, example model code, etc is all very helpful here.

Expected behavior

The table should have been created

Screenshots and log output

If applicable, add screenshots or log output to help explain your problem.

System information

DBT Profile:

config:
  partial_parse: False
  send_anonymous_usage_stats: False
  use_colors: False

ml_iceberg:
  target: dev # default target
  outputs:
    dev:
      type: glue
      query-comment: Glue on the dev account
      role_arn: <<role_arn>>
      region: us-east-1
      workers: 2
      worker_type: G.1X
      idle_timeout: 10
      schema: <<schema>>
      database: <<schema>>
      session_provisioning_timeout_in_seconds: 120
      location: s3://<<bucket>>/iceberg-tables/<<schema>>
      glue_version: "4.0"
      glue_session_id: dbt
      datalake_formats: iceberg
      conf: |
        spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
        --conf spark.sql.catalog.glue_catalog.warehouse=s3://<<location>>
        --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
        --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
        --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

glue version: 4 dbt 1.7.9

<output goes here>

Additional context

Add any other context about the problem here.

vitoravancini commented 6 months ago

dbt glue with iceberg is not working at this moment? I was trying to configure it with iceberg and came to the same problem. Is there any workaround?

hsirah commented 6 months ago

@vitoravancini I have worked around this by adding the catalog to the query like so - SELECT * FROM glue_catalog.{{ source('matches_source', 'sourcetest') }}

eshetben commented 4 months ago

adding --conf spark.sql.defaultCatalog=glue_catalog should help

see this - https://github.com/aws-samples/dbt-glue/issues/359