projectnessie / nessie

Nessie: Transactional Catalog for Data Lakes with Git-like semantics
https://projectnessie.org
Apache License 2.0
1.04k stars 130 forks source link

`ValidationException: Cannot set main to unknown snapshot` occurs when inserting with high concurrency? #8281

Open sxh-lsc opened 7 months ago

sxh-lsc commented 7 months ago

Issue description

16:39:33.385 ERROR o.a.s.s.e.d.v2.AppendDataExec : Data source write support IcebergBatchWrite(table=xxxxxxxx, format=PARQUET) aborted.
org.apache.spark.SparkException: Writing job aborted
    at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:244)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:195)
    at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:149)
    at ai.weride.datalake_service.write_center.Insert$.$anonfun$writeToIceberg$3(Insert.scala:49)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:381)
    at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:504)
    at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:220)
    at zio.internal.FiberRuntime.run(FiberRuntime.scala:139)
    at zio.internal.ZScheduler$$anon$4.run(ZScheduler.scala:478)
Caused by: org.apache.iceberg.exceptions.ValidationException: Cannot set main to unknown snapshot: 5891649337483373908
    at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
    at org.apache.iceberg.TableMetadata$Builder.setBranchSnapshot(TableMetadata.java:1165)
    at org.apache.iceberg.nessie.NessieTableOperations.loadTableMetadata(NessieTableOperations.java:104)
    at org.apache.iceberg.nessie.NessieTableOperations.lambda$doRefresh$1(NessieTableOperations.java:149)
    at org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$1(BaseMetastoreTableOperations.java:208)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:208)
    at org.apache.iceberg.nessie.NessieTableOperations.doRefresh(NessieTableOperations.java:149)
    at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
    at org.apache.iceberg.SnapshotProducer.refresh(SnapshotProducer.java:345)
    at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:210)
    at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:366)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:364)
    at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:216)
    at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:83)
    at org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:279)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:392)
    ... 47 more

This occurs in some high concurrency insertion scenarios. Generally speaking, it will retry until successful with using iceberg commit-retry, but there are still some scenarios where the retry still fails, especially in high concurrency scenarios

dimas-b commented 7 months ago

@sxh-lsc : Please provide Spark, Iceberg and Nessie versions.

sxh-lsc commented 7 months ago

@dimas-b It's been a while since they've been upgraded, so their version may be a bit outdated

iceberg 1.3.1 nessie 0.67.0 spark 3.3.2_scala_2.13
dimas-b commented 7 months ago

@sxh-lsc : These are indeed old versions and Nessie and NessieCatalog in Iceberg did receive multiple fixes since then... Is upgrading Iceberg an option for you?

sxh-lsc commented 7 months ago

OK I will close this and give a try

sxh-lsc commented 7 months ago

@sxh-lsc : These are indeed old versions and Nessie and NessieCatalog in Iceberg did receive multiple fixes since then... Is upgrading Iceberg an option for you?

update iceberg and nessie to newest version, still not working

WARN  org.apache.iceberg.util.Tasks : Retrying task after failure: Cannot commit: Reference hash is out of date. Update the reference 'main' and try again
org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: Reference hash is out of date. Update the reference 'main' and try again
    at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:167)
    at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
    at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:390)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:364)
    at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:216)
    at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:83)
    at org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:279)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:392)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:244)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:195)
    at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:149)
    at ai.weride.datalake_service.write_center.Insert$.$anonfun$writeToIceberg$4(Insert.scala:52)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at zio.ZIOCompanionVersionSpecific.$anonfun$attempt$1(ZIOCompanionVersionSpecific.scala:100)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:904)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:381)
    at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:504)
    at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:220)
    at zio.internal.FiberRuntime.run(FiberRuntime.scala:139)
    at zio.internal.ZScheduler$$anon$4.run(ZScheduler.scala:478)
Caused by: org.projectnessie.error.NessieReferenceConflictException: Key 'xxx_my_table' has conflicting put-operation from commit '1098474702689a1e0bbb8ae2a1d7fe17cd7e40309b41e670dcc52fae06ef8d11'
dimas-b commented 7 months ago

@sxh-lsc : The error seems to be different now.

NessieReferenceConflictException: Key 'xxx_my_table' has conflicting put-operation from commit '1098474702689a1e0bbb8ae2a1d7fe17cd7e40309b41e670dcc52fae06ef8d11'

This means another process (or thread) changed the table while the current transaction was in progress (between the current Tx loaded table metadata and started committing its changes).

At this time Nessie does not have automatic change conflict resolution.

Clients are expected to reload the updated table metadata and retry the change at their level. NessieReferenceConflictException provides details about the conflict that can be examined programmatically.