aws-samples / dbt-glue

This repository contains the dbt-glue adapter
Apache License 2.0
101 stars 69 forks source link

Iceberg datalake_format can't handle non-iceberg sources #405

Open jordanpolun opened 4 months ago

jordanpolun commented 4 months ago

Describe the bug

When trying to use {{ source() }} from a non-iceberg table, a dbt-glue project set up for the iceberg datalake format will refuse to read the source because it's not an iceberg table. The table is available in the Glue Data Catalog but the underlying files are JSON. The full logs are below but the main error is

org.apache.iceberg.exceptions.ValidationException: Input Glue table is not an iceberg table: glue_catalog.ezmaf_osh.org_smc_maf_events_mafconsentevent (type=null)

Everything worked smoothly before I tried to add Iceberg into the equation. Was able to read the source data from JSON and write it to Parquet, just not Iceberg.

Steps To Reproduce

profiles.yaml

dev:
  target: dev
  outputs:
    dev:
      type: glue
      aws_profile_name: dev
      query-comment: from dbt
      role_arn: "arn:aws:iam::XXXXXXXXXXXX:role/My-Glue-ServiceRole"
      region: us-east-1
      glue_version: "4.0"
      workers: 2
      worker_type: G.1X
      schema: dbt_glue
      idle_timeout: 2
      session_provisioning_timeout_in_seconds: 120
      location: "s3://my-bucket/dbt-glue"
      datalake_formats: iceberg
      connections: Iceberg_Glue_4_Connector
      conf: >
        spark.sql.legacy.timeParserPolicy=LEGACY
        --conf spark.sql.legacy.allowNonEmptyLocationInCTAS=true
        --conf spark.serializer=org.apache.spark.serializer.JavaSerializer
        --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
        --conf spark.sql.defaultCatalog=glue_catalog
        --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
        --conf spark.sql.catalog.glue_catalog.warehouse=s3://my-bucket/dbt-glue
        --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
        --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
      default_arguments: >
        --enable-metrics=true,
        --enable-continuous-cloudwatch-log=true,
        --enable-continuous-log-filter=true,
        --enable-spark-ui=true,
        --spark-event-logs-path=s3://aws-glue-assets-XXXXXXXXXXXX-us-east-1/sparkUiLogs/dbt/,
        --enable-auto-scaling=true
      threads: 50
      glue_session_reuse: true

sources.yaml

version: 2
sources:
  - name: maf
    database: glue_catalog
    schema: "ezmaf_osh"
    tables:
      - name: "org_smc_maf_events_mafconsentevent"

dbt_project.yaml

# For more info about what belongs in this file, check the docs https://docs.getdbt.com/reference/dbt_project.yml
name: 'analytics'
version: '2.0.0-pre'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'remote'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:         # directories to be removed by `dbt clean`
  - "target"
  - "dbt_packages"

models:
  analytics:
    +file_format: iceberg
    +incremental_strategy: merge
    +on_schema_change: sync_all_columns
    +persist_docs:
      relation: true
      columns: true

data_tests:
  +severity: warn

models/load/maf/load_maf_consent.sql

{{
    config(
        materialized="table",
        partitioned_by=["day(consentdatetime)"],
        tags=["load", "maf"],
    )
}}
select
    eventname,
    cast(consentdatetime as timestamp) consentdatetime,
    sessionid,
    useridentifier,
    username
from
    {{
        source(
            "maf",
            "org_smc_maf_events_mafconsentevent",
        )
    }}

Expected behavior

dbt-glue should be able to read from any table accessible via the Glue Data Catalog and write back to S3 in Iceberg format.

Screenshots and log output

07:35:17  Running with dbt=1.8.1
07:35:17  Registered adapter: glue=1.8.1
07:35:17  Unable to do partial parsing because of a version mismatch
07:35:19  Found 58 models, 6 seeds, 415 data tests, 32 sources, 2 exposures, 620 macros
07:35:19  
07:35:20  Concurrency: 50 threads (target='dev')
07:35:20  
07:35:20  1 of 1 START sql table model dbt_glue.load_maf_consent ......................... [RUN]
07:37:55  Glue adapter: Glue returned `error` for statement None for code SqlWrapper2.execute('''/* {"app": "dbt", "dbt_version": "1.8.1", "profile_name": "dev", "target_name": "dev", "node_id": "model.analytics.load_maf_consent"} */

    create table dbt_glue.load_maf_consent

    using iceberg

      LOCATION 's3://my-bucket/dbt-glue/dbt_glue/load_maf_consent'
        comment 'Event emitted when the parent initially consents to using the ezMAF app. At the moment, this should only happen once per `useridentifier` but there are future plans to make this happen once per `uniqueformid`'

        as

select
    eventname,
    cast(consentdatetime as timestamp) consentdatetime,
    sessionid,
    useridentifier,
    username
from
    ezmaf_osh.org_smc_maf_events_mafconsentevent
  ''', use_arrow=False, location='s3://my-bucket/dbt-glue'), Py4JJavaError: An error occurred while calling o105.sql.
: org.apache.iceberg.exceptions.ValidationException: Input Glue table is not an iceberg table: glue_catalog.ezmaf_osh.org_smc_maf_events_mafconsentevent (type=null)
        at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
        at org.apache.iceberg.aws.glue.GlueToIcebergConverter.validateTable(GlueToIcebergConverter.java:48)
        at org.apache.iceberg.aws.glue.GlueTableOperations.doRefresh(GlueTableOperations.java:116)
        at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:95)
        at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:78)
        at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:43)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
        at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
        at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:166)
        at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:587)
        at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:142)
        at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:99)
        at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:311)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1206)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$1(Analyzer.scala:1205)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1197)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1068)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1032)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
        at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:226)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
        at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1277)
        at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1274)
        at org.apache.spark.sql.catalyst.plans.logical.CreateTableAsSelect.mapChildren(v2Commands.scala:298)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1032)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:991)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:215)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:212)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:284)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:327)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:284)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:274)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:274)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:188)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:78)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:213)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:213)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:212)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:68)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)

07:37:55  1 of 1 ERROR creating sql table model dbt_glue.load_maf_consent ................ [ERROR in 155.13s]
07:37:55  
07:37:55  Finished running 1 table model in 0 hours 2 minutes and 36.69 seconds (156.69s).
07:37:55  
07:37:55  Completed with 1 error and 0 warnings:
07:37:55  
07:37:55    Runtime Error in model load_maf_consent (models/load/maf/load_maf_consent.sql)
  module 'dbt.exceptions' has no attribute 'DbtDatabaseError'
07:37:55  
07:37:55  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

System information

The output of dbt --version:

Core:
  - installed: 1.8.1
  - latest:    1.8.3 - Update available!

  Your version of dbt-core is out of date!
  You can find instructions for upgrading here:
  https://docs.getdbt.com/docs/installation

Plugins:
  - glue:   1.8.1 - Up to date!
  - spark:  1.8.0 - Up to date!
  - athena: 1.8.2 - Ahead of latest version!

The operating system you're using: Mac OS

The output of python --version: Python 3.12.4

Additional context

Nothing at the moment

aiss93 commented 1 month ago

Hi @jordanpolun

If you want to work with both iceberg and non iceberg tables, you need to use SparkSessionCatalog.

org.apache.iceberg.spark.SparkSessionCatalog adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tables For more details, check this link : [https://iceberg.apache.org/docs/latest/spark-configuration/#catalogs](SparkSessionCatalog configuration)

You'll need to specify the following configurations for the Glue session: --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.warehouse=s3://al-gdo-dev-ww-dl-0139-transfo/data --conf spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.spark_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Keep in mind, that using the SparkSessionCatalog makes the CTAS and RTAS non atomic operations

dveloz2 commented 1 month ago

@aiss93 I used the following conf

      conf: >
        spark.sql.legacy.timeParserPolicy=LEGACY
        --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog 
        --conf spark.sql.catalog.spark_catalog.warehouse=s3://some-bucket/metadata 
        --conf spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog 
        --conf spark.sql.catalog.spark_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO 
        --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
        --conf spark.sql.legacy.allowNonEmptyLocationInCTAS=true
        --conf spark.serializer=org.apache.spark.serializer.JavaSerializer

And i got the error: org.apache.iceberg.exceptions.ValidationException: Input Glue table is not an iceberg table: spark_catalog.iceberg_db.my_table (type=null)

Are there any missing configurations that you could suggest?