delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.61k stars 1.71k forks source link

[BUG] Third delta write failing for partition column position #3738

Open nizamudeeen opened 1 month ago

nizamudeeen commented 1 month ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

Third write to delta table failing with exception. After debugging found partition column expected to be last element in the data frame. But the same example works in Spark version: > 3.3.1 and delta-core_2.12 version : 2.3.0

Steps to reproduce

Please include copy-pastable code snippets if possible. 1. Build Scala project with Spark version: 3.5.2/ delta-spark_2.12 version : 3.2.0

  1. add below unit test
   test("third overwrite failing snapshot") {
    import org.apache.spark.sql.types.{StructField, StructType, LongType, StringType}
    val tableName = "default.target_table";
    val schema = new StructType()
      .add(StructField("col1", LongType))
      .add(StructField("col2", StringType))
      .add(StructField("col3", StringType))
      .add(StructField("col4", StringType))
      .add(StructField("col5", StringType))
      .add(StructField("col6", StringType))

    val data = Seq(
      Row(444L, "String", null, "String_partition", "num1 value", "num2 value for null "),
      Row(444L, "String", "String", "String_partition", "num1 value", "num2 value for string")

    )
    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    df.write.format("delta").partitionBy("col4").option("mergeSchema", "true").mode("append").saveAsTable(tableName)
    df.write.format("delta").partitionBy("col4").option("mergeSchema", "true").mode("overwrite").option("replaceWhere", "col4 = 'String_partition'").saveAsTable(tableName)
    df.write.format("delta").partitionBy("col4").option("mergeSchema", "true").mode("overwrite").option("replaceWhere", "col4 = 'String_partition'").saveAsTable(tableName)
  }

Observed results

Getting below exception 24/09/30 14:26:43 ERROR Utils: Aborting task

java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.catalyst.catalog.CatalogTable.partitionSchema(interface.scala:264)
    at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.alterTableDataSchema(InMemoryCatalog.scala:330)
    at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.alterTableDataSchema(ExternalCatalogWithListener.scala:124)
    at org.apache.spark.sql.delta.hooks.UpdateCatalog$.org$apache$spark$sql$delta$hooks$UpdateCatalog$$replaceTable(UpdateCatalog.scala:334)
    at org.apache.spark.sql.delta.hooks.UpdateCatalog.updateSchema(UpdateCatalog.scala:236)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.updateCatalog(CreateDeltaTableCommand.scala:579)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.runPostCommitUpdates(CreateDeltaTableCommand.scala:194)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.handleCommit(CreateDeltaTableCommand.scala:174)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.$anonfun$run$2(CreateDeltaTableCommand.scala:110)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordFrameProfile(CreateDeltaTableCommand.scala:57)
    at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:57)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:57)
    at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:110)
    at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createDeltaTable$1(DeltaCatalog.scala:184)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
    at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:65)
    at org.apache.spark.sql.delta.catalog.DeltaCatalog.org$apache$spark$sql$delta$catalog$DeltaCatalog$$createDeltaTable(DeltaCatalog.scala:95)
    at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.$anonfun$commitStagedChanges$1(DeltaCatalog.scala:545)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
    at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:65)
    at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.commitStagedChanges(DeltaCatalog.scala:507)
    at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:580)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
    at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable(WriteToDataSourceV2Exec.scala:573)
    at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable$(WriteToDataSourceV2Exec.scala:567)
    at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:183)
    at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:216)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:634)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:568) 

Expected results

Third write should be success

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

nizamudeeen commented 1 month ago

@dhruvarya-db @vkorukanti

is this issue side effect of https://github.com/delta-io/delta/commit/96f77ab3b0692e75c015aeb64716fce50043e77f ??