delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.58k stars 1.7k forks source link

Can't apply data changes generated from an external database into a Delta table - Merge not performing update and delete #574

Closed titowoche30 closed 2 years ago

titowoche30 commented 3 years ago

The problem

Hello,

I'm working on a Change Data Capture and my goal is to replicate data from a parquet into a Delta table by making the required inserts, updates and deletes. I followed the tutorial in https://docs.delta.io/latest/delta-update.html#write-change-data-into-a-delta-table but for some reason the delta table ends-up not applying any updates or deletes.

The code

I'm using 0.7.0 and the following spark configs.

 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
 .config("spark.delta.logStore.class","org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
 .config("spark.databricks.delta.schema.autoMerge.enabled","true")

Reading a parquet from S3 and adding a delete column. The Op column contains U,D or I, meaning, update, delete or insert.

val data = spark.read.parquet(s"s3a://my-parquet-path").withColumn("deleted",lit(col("Op") === 'D'))

Creating the Delta table and saving on S3.

val writeDeltaTable =  data.filter(col("Op").isNull)
          .select(col("Op"),col("deleted"),
              struct(col("ID")).as("key"),
              struct(data.drop("Op").columns.head, data.drop("Op").columns.tail: _*).as("value"))

writeDeltaTable.write.format("delta").save(s"s3a://my-delta-table-path")

Creating 2 auxiliaries DataFrames.

val operationsDf = data.filter(col("Op").isNotNull)
          .select(col("Op"),
              struct(col("ID")).as("key"),
              struct(data.drop("Op").columns.head, data.drop("Op").columns.tail: _*).as("value"))

val updatesdf = operationsDf.filter(lit(col("Op") === 'U'))
  .groupBy("key","Op")
  .agg(max("value").as("value"))
  .selectExpr("Op","key", "value")

Creating the unionDf containing the I, D and last U.

val unionDf = operationsDf.filter(lit(col("Op") === 'D').or(lit(col("Op") === 'I')))
          .union(updatesdf)
          .orderBy("value.cdcTime")

unionDf.selectExpr("Op","key", "value.*").show()

This DataFrame contains the following testing registers

Op AKEY ORGANIZATIONID
I testing 10022
U testing 10021
D testing 10021

And other columns and registers that is not convenient to show here.

Reading the Delta Table which I just saved on S3

val deltaTable = DeltaTable.forPath(spark,s"s3a://my-delta-table-path")

Executing the Merge

deltaTable.as("delta")
          .merge(
              unionDf.as("dataframe"),
              "dataframe.key = delta.key")
          .whenMatched("dataframe.value.deleted = true")        
          .delete()
          .whenMatched("dataframe.Op = 'U'")
          .updateExpr(Map("key" -> "dataframe.key", "value" -> "dataframe.value"))
          .whenNotMatched("!dataframe.Op = 'U' AND dataframe.value.deleted = false")
          .insertExpr(Map("key" -> "dataframe.key", "value" -> "dataframe.value","Op" -> "dataframe.Op"))
          .execute()

I tried to use dataframe.Op = 'D' instead of dataframe.value.deleted = true, but the results were the same.

Reading and printing the deltaTable that I just used.

val df = spark.read.format("delta").load(s"s3a://my-delta-table-path")

df.orderBy(desc("value.cdcTime"))
          .selectExpr("Op","key", "value.*")
          .show()

My testing registers in this Delta Table end-up like this

Op AKEY ORGANIZATIONID
I testing 10022

And this visualization

df.groupBy("Op").count().show()

like this

Op count
null 6478
I 2

Meaning that, it only inserts registers, it doesn't update neither deletes the record

amineds commented 3 years ago

@titowoche30 are you still facing this issue ? Have you tried with newer version of Delta ?

titowoche30 commented 2 years ago

@amineds I am not