aws-samples / dbt-glue

This repository contains de dbt-glue adapter
Apache License 2.0
95 stars 61 forks source link

Error when performing append incremental load with iceberg table #226

Open BrentonAD opened 1 year ago

BrentonAD commented 1 year ago

Describe the bug

I have a simple model (for the sake of argument called my_table) which loads data from an existing iceberg table (not created with dbt-glue) and writes to a new iceberg table:

{{
    config(
        partition_by=["timestamp_day","natural_key"],        
        incremental_strategy="append",
        file_format="iceberg"
    )
}}
WITH dataset AS (
  SELECT 
  *
  FROM glue_catalog.{{ source('staging','raw') }}
)
SELECT 
  *,
  date_trunc('day', timestamp) AS timestamp_day
  FROM dataset
  {% if is_incremental() %}
    WHERE _insert_datekey> (SELECT MAX(_insert_datekey) FROM {{ this }} )
  {% endif %}

On a first run of the model, the table successfully loads and I can query from AWS Athena as expected however on subsequent runs of the model I receive the following error indicating that the table cannot be found in the glue_catalog.

spark.sql("""
                        INSERT INTO glue_catalog.my_schema.my_table
                        SELECT * FROM tmp_my_table
                        ORDER BY (timestamp_day,natural_key)
                    """) 
spark.sql("""REFRESH TABLE glue_catalog.my_schema.my_table""")
SqlWrapper2.execute("""SELECT * FROM glue_catalog.my_schema.my_table LIMIT 1""")
, AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table my_table. StorageDescriptor#InputFormat cannot be null for table: my_table (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null)

My configuration in my profile.yml is as follows:

   dev_glue:
      type: glue
      role_arn: arn:aws:iam::000000000:role/GlueRoleName
      schema: "my_schema"
      location: "s3://my-s3-bucket/prefix/my_schema"
      iceberg_glue_commit_lock_table: DbtGlueLockTable
      region: <region-redacted>
      workers: 8
      worker_type: G.1X
      glue_version: "3.0"
      idle_timeout: 15
      session_provisioning_timeout_in_seconds: 300
      connections: iceberg_connector_name
      conf: "--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
             --conf spark.serializer=org.apache.spark.serializer.JavaSerializer
             --conf spark.sql.catalog.glue_catalog.warehouse=s3://s3://my-s3-bucket/prefix/my_schema
             --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkSessionCatalog
             --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
             --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
             --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager
             --conf spark.sql.catalog.glue_catalog.lock.table=DbtGlueLockTable
             --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
             --conf spark.sql.iceberg.handle-timestamp-without-timezone=true"
      default_arguments: "--enable-metrics=true, --enable-continuous-cloudwatch-log=true, --enable-continuous-log-filter=true, --enable-spark-ui=true, --spark-event-logs-path=s3://my-s3-bucket/prefix/sparklogs/"

Steps To Reproduce

  1. Create a sample iceberg table with AWS Athena
  2. create a file name my_table.sql with configuration mentioned above
  3. Perform dbt run --model my_table and wait for completion
  4. Repeat 2. Now we can see error as mentioned

Expected behavior

Subsequent runs of dbt model will perform an incremental load as per the append strategy, rather than cause an error.

Screenshots and log output

If applicable, add screenshots or log output to help explain your problem.

System information

The output of dbt --version:

Core:
  - installed: 1.5.2
  - latest:    1.6.1 - Update available!

  Your version of dbt-core is out of date!
  You can find instructions for upgrading here:
  https://docs.getdbt.com/docs/installation

Plugins:
  - glue:   1.5.0 - Update available!
  - spark:  1.5.2 - Update available!
  - athena: 1.5.1 - Ahead of latest version!

  At least one plugin is out of date or incompatible with dbt-core.
  You can find instructions for upgrading here:
  https://docs.getdbt.com/docs/installation

The operating system you're using: VS Code Dev container running Ubuntu 22.04.3 LTS

The output of python --version: Python 3.11.4

Additional context

I have tried many different spark configurations, including different names for the iceberg catalog alias to match the Glue catalog name, and matched that name in the prefix for the SELECT * FROM <prefix>.{{ source('staging','raw') }}

rickiesmooth commented 11 months ago

I have the same issue with insert_overwrite were you able to fix this?

AmineIzanami commented 11 months ago

@BrentonAD your conf seems wrong, especially for "spark.sql.catalog.glue_catalog.warehouse":, you have a double s3://

@rickiesmooth what's your conf ? can you validate that you have the key : spark.sql.catalog.glue_catalog.warehouse

BrentonAD commented 11 months ago

@AmineIzanami Thank you for your reply, that mistake was simply me accidentally mistyping when redacting company specific bucket names e.t.c. I can confirm that is not in the config when I am facing this issue, sorry about the confusion.

I want to confirm again that the load runs correctly the first time, just not on subsequent incremental runs so I am somewhat confident my config is correct.

rickiesmooth commented 11 months ago

@AmineIzanami thank you for looking into this! My conf looks like this:

glue_test:
  target: dev
  outputs:
    dev:
      type: glue
      query-comment: glue_test
      role_arn: "arn:aws:iam::XXXXXXXXX:role/ci_cron_role"
      region: us-east-1
      glue_version: "4.0"
      workers: 2
      worker_type: G.1X
      schema: "glue_test"
      session_provisioning_timeout_in_seconds: 60
      location: "s3://glue-test-dev-pipeline-data-lake-output/"
      datalake_formats: iceberg
      conf: --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.warehouse=s3://glue-test-prod-pipeline-data-lake-output/glue_test --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

I've had a similar issue when trying delta datalake format

rickiesmooth commented 11 months ago

maybe good to note that with iceberg, runs for my "base" table that doesn't use any refs all succeed, it's tables that reference this "base" table that error.

rickiesmooth commented 11 months ago

@mehdimld pointed me to a workaround in slack:

for now if you want to ref an iceberg models built with dbt-glue you need to prefix the ref(‘other_model’) with “glue_catalog”. Please see below an example where merge_customers is an Iceberg table built with dbt-glue


{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=["customer_id"],
file_format='iceberg',
partition_by=['dt'], 
table_properties={'write.target-file-size-bytes': '268435456'}
) }}

select * from glue_catalog.{{ ref('merge_customers')}}


which fixes the issue!