delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG] MergeIntoCommand not visible in QueryExecutionListener when using Python/Scala API #1521

Closed sh0ck-wave closed 3 months ago

sh0ck-wave commented 1 year ago

Bug

MergeIntoCommand not visible in QueryExecutionListener when using Python/Scala API to execute a merge operation

Describe the problem

When using sql MERGE statement via spark.sql a LogicalPlan of type org.apache.spark.sql.delta.commands.MergeIntoCommand is visible to any registered spark QueryExecutionListener, this is useful for capturing statistics and data lineage. When using the python API to execute the merge operation, no such LogicalPlan is visible to registered spark QueryExecutionListeners making it difficult to track merge related statistics and data lineage

Steps to reproduce

Execute the following scala spark application:

package com.foo.bar

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.sql.execution.QueryExecution
import io.delta.tables.DeltaTable

class MyListener extends QueryExecutionListener {

  override def onSuccess(
      funcName: String,
      qe: QueryExecution,
      durationNs: Long
  ): Unit = {
    println(s"Received Event from ${qe.analyzed.getClass}")
  }

  override def onFailure(
      funcName: String,
      qe: QueryExecution,
      exception: Exception
  ): Unit = {
    println(s"Received Failure Event from ${qe.analyzed.getClass}")
  }
}

case class Player(id: Integer, name: String)

object SimpleApp {
  def main(args: Array[String]) = {
    println(s"starting")

    val spark_conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("SimpleApp")
      .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .set(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog"
      )
    val spark = SparkSession.builder.config(spark_conf).getOrCreate()
    spark.sparkContext.setLogLevel("OFF")

    val df = spark.createDataFrame[Player](Seq(Player(1, "Quark")))
    val df1 = spark.createDataFrame[Player](Seq(Player(1, "Boson")))
    (
      df.write
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .option("path", "/path/to/table1")
        .format("delta")
        .saveAsTable("base")
    )

    (
      df1.write
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .option("path", "/path/to/table2")
        .format("delta")
        .saveAsTable("update")
    )

    spark.listenerManager.register(new MyListener())

    println("Captured plans for SQL MERGE:")

    spark.sql(
      """
      |MERGE INTO base
      |USING update
      |ON base.Id = update.Id
      |WHEN MATCHED THEN
      |    UPDATE SET *
      |WHEN NOT MATCHED THEN
      |    INSERT *
      """.stripMargin
    )
    Thread.sleep(5000)
    spark.listenerManager.clear()

    val base_table = DeltaTable.forPath(spark, "/path/to/table1")
    val update_table = DeltaTable.forPath(spark, "/path/to/table2")
    val merge = (
        base_table.alias("a")
        .merge(update_table.toDF.alias("b"), "a.Id == b.Id")
        .whenMatched().updateAll()
        .whenNotMatched().insertAll()
    )

    spark.listenerManager.register(new MyListener())
    println("")
    println("Captured plans for Delta API:")
    merge.execute()

    Thread.sleep(5000)
    spark.stop()
  }
}

Observed results

Captured plans for SQL MERGE:
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Aggregate
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
Received Event from class org.apache.spark.sql.delta.commands.MergeIntoCommand

Captured plans for Delta API:
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Aggregate
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.GlobalLimit

As can be seen in the case of Delta API there is no org.apache.spark.sql.delta.commands.MergeIntoCommand captured by the QueryExecutionListener

Expected results

org.apache.spark.sql.delta.commands.MergeIntoCommand should be captured by QueryExecutionListener for Delta API similar to SQL MERGE command

Environment information

zsxwing commented 1 year ago

The issue is we don't call Dataset.ofRows for Merge: https://github.com/delta-io/delta/blob/c2b0ce6d4f35df2fe6c2281299ae38513f50305d/core/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala#L264

There were some spark issues preventing us from doing this. Would you be willing to help us to try out and see if these spark issues have got fixed?

sh0ck-wave commented 1 year ago

There were some spark issues preventing us from doing this. Would you be willing to help us to try out and see if these spark issues have got fixed?

Yes, Can you provide some guidance on how to repro these issues

zsxwing commented 1 year ago

Yes, Can you provide some guidance on how to repro these issues

You can call toDataset on the merge command like https://github.com/delta-io/delta/blob/edaeb86304211513c8028d056a7d90e98ec2839c/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala#L80 and see if there is any test failed. If Spark hasn't fixed the issue, there will be tests failing.

sherlockbeard commented 1 year ago

@zsxwing spark have fixed the issue . spark issue Previously, I made these changes in a pull request and the tests passed. should I create a pull request for this change

zsxwing commented 1 year ago

should I create a pull request for this change

Yep. Feel free to open a pull request.

sh0ck-wave commented 1 year ago

@sherlockbeard thanks for working on this item @zsxwing I can confirm that the PR solves this Bug. Testing the PR, I was able to get the following output with it from the same sample app as above:

Captured plans for SQL MERGE:
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Aggregate
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
Received Event from class org.apache.spark.sql.delta.commands.MergeIntoCommand

Captured plans for Delta API:
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Aggregate
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Project
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.Filter
Received Event from class org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
Received Event from class org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
Received Event from class org.apache.spark.sql.delta.commands.MergeIntoCommand
fluxquantum commented 3 months ago

@sh0ck-wave @sherlockbeard Hi, appreciate the effort you put into resolving and researching this. Is there a timeline for when this fix can be merged? Or is there something check that's blocking it's release/approval?

johanl-db commented 3 months ago

I picked up the change from @sherlockbeard, ran some more tests and merged it: https://github.com/delta-io/delta/pull/3456 cc @fluxquantum

fluxquantum commented 3 months ago

Hi johanl, really appreciate the update. Awesome on the quick turnaround. I know I posed this question in an earlier thread. Is there a version of the library I can pull as a patch for now or will I need to wait for a major release?