apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.12k stars 2.13k forks source link

Data Deletion Issue with MERGE INTO ... WHEN MATCHED THEN DELETE statement in Iceberg 1.3 with Spark 3.4.1 #8126

Open alesk opened 1 year ago

alesk commented 1 year ago

Apache Iceberg version

1.3.0 (latest release)

Query engine

Spark

Environment:

Iceberg version: 1.3 Spark version: 3.4.1

Description: When using the MERGE INTO ... WHEN MATCHED THEN DELETE statement to delete rows from an Iceberg table, an issue occurs if one of the remaining rows has a null value in any column. This issue does not happen when using Iceberg 1.3 with Spark 3.3.2. The table must be sorted to trigger the bug.

Steps to Reproduce:

Steps to reproduce with code and example are described in https://github.com/alesk/spark-bug-iceberg-delete

nastra commented 1 year ago

@alesk could you please attach the stack trace to better understand where the error is coming from? In https://github.com/alesk/spark-bug-iceberg-delete I only see the error msg, but the stack trace would be quite helpful

alesk commented 1 year ago

I attached the stack trace to the repo and I'm also attacging it here

        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:408)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:382)
        at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:294)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:359)
        at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(WriteToDataSourceV2Exec.scala:294)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
        at java.base/java.lang.reflect.Method.invoke(Method.java:577)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:464)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
        at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
chengxianglibra commented 1 year ago

@aokolnychyi It seems this bug is introduced by #7646. RewriteRowLevelIcebergCommand::buildMergingOutput would transform attribute nullable property from true to false if attribute is not in outputs, and in RewriteMergeIntoTable::buildReplaceDataPlan, #7646 update the following code val matchedOutputs = matchedActions.map(actionOutput(_, metadataAttrs)) :+ readAttrs to val matchedOutputs = matchedActions.map(matchedActionOutput(_, metadataAttrs)) When outputs is empty and the remaining row after deletion has null value(such as the previous test case), all attributes nullable property is set to false, spark generated code does not check nullable of field value, which finally lead to this NPE.

a0x commented 12 months ago

@chengxianglibra I met this bug, too. So I was trying to debug this issue by fixing the source code. In the end I have to admit that I'm not so familiar with Scala when I met this typing problem as below.

image

So, is there any way to fix this issue?