delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[Feature Request] [MERGE] Avoid copy rows when match clause is false. Potential for huge perf impact. #1812

Open felipepessoto opened 1 year ago

felipepessoto commented 1 year ago

Feature request

Which Delta project/connector is this regarding?

Overview

The method findTouchedFiles in MergeIntoCommand only filter files by the condition (ON clause). Rewriting the entire table even when the match clause is false.

Motivation

This is a big problem when you merge two big tables and match clause is mostly false, but ON clause matches most of the target table, like the example below.

Further details

def showMetrics(targetDataPathParam: String) {
  val deltaTable = io.delta.tables.DeltaTable.forPath(spark, targetDataPathParam)
   val operationMetrics = deltaTable.history(1).select("operationMetrics").head.toString()
   println(s"Metrics for $targetDataPathParam: \r\n$operationMetrics\r\n")
}

spark.conf.set("spark.sql.files.maxRecordsPerFile", 1)

val rows = 1000

val data = spark.range(rows).withColumn("name", lit("John"))

val df = data.toDF("id", "name")

df.write.format("delta").save(tablePath)

val source = data.toDF("id2", "name2")

source.createOrReplaceTempView("test_data")

spark.sql("MERGE INTO delta.`" + tablePath + """` t 
   USING test_data s
   ON s.id2 = t.id
   WHEN MATCHED AND NOT (s.name2 <=> t.name) THEN UPDATE SET t.name = s.name2
   WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id2, s.name2)""")

showMetrics(tablePath)

Observed results

numTargetRowsCopied -> 1000 numOutputRows -> 1000 numTargetFilesRemoved -> 1000 numTargetFilesAdded -> 1000

Expected results

numTargetRowsCopied -> 0 numOutputRows -> 0 numTargetFilesRemoved -> 0 numTargetFilesAdded -> 0

Further details

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

keen85 commented 7 months ago

We just ran into this very bug using Delta 2.2.0 / 2.4.0. I really think that this is something that should be fixed.

Our workaround is to include the update-condition into the merge-condition as well.

For us, this reduced execution time from 55 minutes to 10 minutes.

keen85 commented 7 months ago

@johanl-db can you tell if this issue is covered by #1827?

johanl-db commented 7 months ago

@keen85 If your merge only contains whenMatched update/delete clauses (and no whenNotMatched or whenNotMatchedBySource clauses) then it will benefit from https://github.com/delta-io/delta/pull/1851 (part of a series of improvements in https://github.com/delta-io/delta/issues/1827). You will need to upgrade to Delta 3.0 or above to benefit from that change.

keen85 commented 7 months ago

thanks a lot @johanl-db.

Should this issue then be marked as resolved?