delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.44k stars 1.67k forks source link

MERGE INTO WHEN NOT MATCHED THEN INSERT * fails on schema evolution (new column at source) when run via SQL #558

Open Walderman opened 3 years ago

Walderman commented 3 years ago

Problem When trying to run MERGE INTO command in spark sql with clause: WHEN NOT MATCHED THEN INSERT * getting error: org.apache.spark.sql.AnalysisException: Unable to find the column 'col2' of the target table from the INSERT columns: id. INSERT clause must specify value for all the columns of the target table. This only happens for insert part and only when this is executed via spark sql (as per below steps to reproduce)

Steps to reproduce Version: Spark 3.0.0 with delta-core 0.7.0


    val conf = new SparkConf()
    .set("spark.master", "local")
      .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .set("spark.databricks.delta.schema.autoMerge.enabled", "true")

    var spark = SparkSession
      .builder()
      .appName("delta lake test")
      .config(conf)
      .getOrCreate()

    val data = spark.range(0)
    print(data.columns.mkString(","))
    data.write.format("delta").mode("Overwrite").save("local_path")

    var data2 = spark.range(2,20).toDF
    data2 = data2.withColumn("col2", lit("1").cast("String"))
    print(data2.show(100, false))
    data2.createOrReplaceTempView("table2")

    val dt2 = spark.read.format("delta").load("local_path")
    dt2.createOrReplaceTempView("test2")

   // ERROR HAPPENING ON THIS STATEMENT
    spark.sql("merge into test2 as t using table2 as s on t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *  ")

removing WHEN NOT MATCHED INSERT * clause makes the statement work (i get new schema with col2 added)

spark.sql("merge into test2 as t using table2 as s on t.id = s.id WHEN MATCHED THEN UPDATE SET *")

running update/insert via Scala API also works

    val kk = DeltaTable.forPath("local_path")
    kk.alias("t").merge(
        data2.alias("s"),
        "t.id = s.id")
      .whenMatched().updateAll()
      .whenNotMatched().insertAll()
      .execute()

Expected Result Both Spark SQL and API merge work in the same way and allow for schema evolution with option "spark.databricks.delta.schema.autoMerge.enabled" set to "true"

in this case, as per https://docs.delta.io/latest/delta-update.html#merge-examples&language-scala i would expect new column to be added, which works for api version.

alonisser commented 3 years ago

I've hit this too, with the python api , databricks 7.4 and 7.3

Walderman commented 3 years ago

I had another look at this on delta-core 0.8.0, and the error is gone, but schema evolution still does not work (MERGE statement completes successfully, but any source columns not present in target table are ignored). From looking into the code, schema evolution on MERGE directly from SQL statement is not supported yet? Would be great to add this.

Antauri commented 3 years ago

We're literally in production (for 1 year) with Delta and this doesn't work, causing pain and trouble. It used to work in Delta 0.6 but rolling back now is troublesome for us (basically going back to EMR5 on Spark 2.x).

Our environment currently experience this bug is: Delta 0.8.0/Spark 3 on EMR 6.2. We're indeed also using Spark SQL. Work-around as Walderman suggested, to use the API version. It somehow also got passed our QA as production has long-running tables (since Delta 0.6) while QA environment usually gets reset.

alonisser commented 3 years ago

Lately I found that this specific error also happens with this very misleading message when there is a type mismatch on the merge condition

on t.id = s.id 

For example if t.id and s.id aren't the same type.