aws-samples / dbt-glue

This repository contains the dbt-glue adapter
Apache License 2.0
101 stars 69 forks source link

Adding minimal write option when creating table with Delta format #416

Closed Jeremynadal33 closed 2 months ago

Jeremynadal33 commented 3 months ago

resolves part of #415

Description

Add write option for creating table with file format Delta when using incremental strategy. For example, it allows --full-refresh when changing schema by adding a config : _delta_create_table_writeoptions = {"mergeSchema": "true"} to your model.

Checklist

Credits : Nicolas Fourmann initially found this solution !

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

FYI : I am not sure wether this change needs testing, if it does, I would be interested in understanding how to!

Test that were run locally

Simple model with minimal incremental merge strategy :

{{
    config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=['id'],
        file_format='delta',
    )
}}

with incoming_data as (
    select 1 as id
    union all
    select 2 as id
)

select * from incoming_data

’’’ No problem at first run but when adding a column and running :

dbt run -s test_change_schema --full-refresh

Got an error from the glue adapter :

08:23:55  Glue adapter: Glue returned `error` for statement None for code 

spark.sql("""

with incoming_data as (
    select 1 as id, 'a' as new_col
    union all
    select 2 as id, 'a' as new_col
)

select * from incoming_data
""").write.format("delta").mode("overwrite").save("s3://xxx/xxx/test_change_schema")
SqlWrapper2.execute("""select 1""")
, AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: xxx).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- id: integer (nullable = true)

Data schema:
root
-- id: integer (nullable = true)
-- new_col: string (nullable = true)

To overwrite your schema or change partitioning, please set:
'.option("overwriteSchema", "true")'.

Note that the schema can't be overwritten when using
'replaceWhere'.

Now adding the config and the model looks like :

{{
    config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=['id'],
        write_options = {"mergeSchema": "true"},
        file_format='delta',
    )
}}

with incoming_data as (
    select 1 as id, 'a' as new_col
    union all
    select 2 as id, 'a' as new_col
)

select * from incoming_data

And the same command passes gracefully and my table has its schema updated

moomindani commented 2 months ago

Thank you for your contribution.