delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG][SPARK] Changed records not inserted using SCD2 pattern #3768

Closed lymedo closed 1 month ago

lymedo commented 1 month ago

Bug

Describe the problem

I've followed the SCD2 pattern in the docs but the changed records are not being inserted as part of the merge and only the existing record in the target table is being updated. New records are being inserted as expected.

Am I misunderstanding the behaviour?

Steps to reproduce


schema = StructType([
    StructField("reference_code",StringType(),True),
    StructField("reference_value",StringType(),True),
    StructField("category",StringType(),True),
  ])

# initial data insert to table
data = [("A","A-VALUE","A-CAT")]
df = spark.createDataFrame(data=data,schema=schema)

# new data for SCD2 merge
data = [("A","A-VALUE","A-CAT1"),
        ("B","B-VALUE","B-CAT")
        ]
df_new_data = spark.createDataFrame(data=data,schema=schema)

# ...
# create the insert/update dataframes by comparing to target and union to df_merge
# ...

# merge
dt = DeltaTable.forName(spark, 'default.reference')

(dt.alias('target')
 .merge(
     df_merge.alias('source'),
     f"source.reference_code = target.reference_code"
     )
 .whenMatchedUpdate(
     condition="source.row_hash <> target.row_hash AND target.row_is_current == 1",
     set = {                                      
        "row_effective_to_datetime": lit(current_timestamp()),
        "row_is_current": lit(0)
    }
 )
 .whenNotMatchedInsert(values = {
     "reference_code": "source.reference_code",
     "reference_value": "source.reference_value",
     "category": "source.category",
     "row_hash": "source.row_hash",
     "row_effective_from_datetime": lit(current_timestamp()),
     "row_is_current": lit(1)
     })
).execute()

Observed results

The effective to date on record A is updated as expected. Record B is inserted. Updated version of record A is not inserted.

It only works if I add a second merge:

(dt.alias('target')
 .merge(
     df_merge.alias('source'),
     f"source.reference_code = target.reference_code AND source.row_hash == target.row_hash"
     )
 .whenNotMatchedInsert(values = {
     "reference_code": "source.reference_code",
     "reference_value": "source.reference_value",
     "category": "source.category",
     "row_hash": "source.row_hash",
     "row_effective_from_datetime": lit(current_timestamp()),
     "row_is_current": lit(1)
     })
).execute()

Expected results

I was expecting to see original record A updated and new records A & B inserted.

Further details

Environment information

Databricks runtime 15.4LTS

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

felipepessoto commented 1 month ago

In your first code sample, are you expecting whenNotMatchedInsert to be executed for rows where the condition above it is not met?

 .whenMatchedUpdate(
     condition="source.row_hash <> target.row_hash AND target.row_is_current == 1",

In my understanding the whenNotMatchedInsert would only be executed if the condition in the merge is not met:

 .merge(
     df_merge.alias('source'),
     f"source.reference_code = target.reference_code"
     )

That is why your second code sample works. I'm assuming f"source.reference_code = target.reference_code" always match

lymedo commented 1 month ago

I've realised what I need to make this work as expected. The source DataFrame requires two rows for the changed record. One with a merge key matching the reference_code and one with a merge key as NULL.