Qbeast-io / qbeast-spark

Qbeast-spark: DataSource enabling multi-dimensional indexing and efficient data sampling. Big Data, free from the unnecessary!
https://qbeast.io/qbeast-our-tech/
Apache License 2.0
215 stars 19 forks source link

Handle ConcurrentAppendException in qbeast-spark #441

Closed JosepSampe closed 4 weeks ago

JosepSampe commented 1 month ago

Since PR Issue #405: DataWriter refactory #402, a ConcurrentAppendException started appearing during a write() and a optimize()

io.delta.exceptions.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1728639379364,"operation":"WRITE","operationParameters":{"mode":Append},"readVersion":4,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"160724","numOutputBytes":"120799894"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0","txnId":"27fd66c8-5027-4e2e-b5e7-5611887b2f73"}
Refer to https://docs.delta.io/latest/concurrency-control.html for more details.
| at org.apache.spark.sql.delta.DeltaErrorsBase.concurrentAppendException(DeltaErrors.scala:2300)
| at org.apache.spark.sql.delta.DeltaErrorsBase.concurrentAppendException$(DeltaErrors.scala:2291)
| at org.apache.spark.sql.delta.DeltaErrors$.concurrentAppendException(DeltaErrors.scala:3203)
| at org.apache.spark.sql.delta.ConflictChecker.$anonfun$checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn$1(ConflictChecker.scala:305)
| at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
| at org.apache.spark.sql.delta.ConflictChecker.recordTime(ConflictChecker.scala:499) 
| at org.apache.spark.sql.delta.ConflictChecker.checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn(ConflictChecker.scala:276)
| at org.apache.spark.sql.delta.ConflictChecker.checkConflicts(ConflictChecker.scala:142)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflictsAgainstVersion(OptimisticTransaction.scala:1882)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflictsAgainstVersion$(OptimisticTransaction.scala:1872)
| at org.apache.spark.sql.delta.OptimisticTransaction.checkForConflictsAgainstVersion(OptimisticTransaction.scala:142)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$10(OptimisticTransaction.scala:1860) 
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$10$adapted(OptimisticTransaction.scala:1856)
| at scala.collection.Iterator.foreach(Iterator.scala:943)
| at scala.collection.Iterator.foreach$(Iterator.scala:943)
| at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
| at scala.collection.IterableLike.foreach(IterableLike.scala:74)
| at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
| at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$2(OptimisticTransaction.scala:1856)  
| at org.apache.spark.sql.delta.catalog.DeltaTableV2$.withEnrichedUnsupportedTableException(DeltaTableV2.scala:364 
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$1(OptimisticTransaction.scala:1825)  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) 
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) 
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflicts(OptimisticTransaction.scala:1825) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflicts$(OptimisticTransaction.scala:1815) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.checkForConflicts(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$4(OptimisticTransaction.scala:1654) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$3(OptimisticTransaction.scala:1652) |  
| at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:1648) |  
| at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:1648) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.lockCommitIfEnabled(OptimisticTransaction.scala:1626) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:1642) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:1638) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.liftedTree1$1(OptimisticTransaction.scala:1128) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commitImpl$1(OptimisticTransaction.scala:1056) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitImpl(OptimisticTransaction.scala:1053) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitImpl$(OptimisticTransaction.scala:1048) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.commitImpl(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:1042) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:1038) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:142) |  
| at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$5(DeltaMetadataWriter.scala:198) |  
| at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$5$adapted(DeltaMetadataWriter.scala:172) |  
| at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:223) |  
| at io.qbeast.spark.delta.DeltaMetadataWriter.writeWithTransaction(DeltaMetadataWriter.scala:172) |  
| at io.qbeast.spark.delta.DeltaMetadataManager$.updateWithTransaction(DeltaMetadataManager.scala:43) |  
| at io.qbeast.table.IndexedTableImpl.$anonfun$optimize$2(IndexedTable.scala:511) |  
| at io.qbeast.table.IndexedTableImpl.$anonfun$optimize$2$adapted(IndexedTable.scala:501) |  
| at scala.collection.Iterator.foreach(Iterator.scala:943) |  
| at scala.collection.Iterator.foreach$(Iterator.scala:943) |  
| at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) |  
| at scala.collection.IterableLike.foreach(IterableLike.scala:74) |  
| at scala.collection.IterableLike.foreach$(IterableLike.scala:73) |   
| at scala.collection.AbstractIterable.foreach(Iterable.scala:56) |  
| at io.qbeast.table.IndexedTableImpl.optimize(IndexedTable.scala:501) |  
| at io.qbeast.table.QbeastTable.optimize(QbeastTable.scala:100) |  
| at io.qbeast.use.managed.index.LeveledCompaction.$anonfun$doOptimize$3(LeveledCompaction.scala:98) |  
| at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) |  
| at io.qbeast.use.utils.TimeUtils.time(TimeUtils.scala:10) |  
| at io.qbeast.use.utils.TimeUtils.time$(TimeUtils.scala:8) |  
| at io.qbeast.use.managed.index.LeveledCompaction.time(LeveledCompaction.scala:31) |  
| at io.qbeast.use.managed.index.LeveledCompaction.doOptimize(LeveledCompaction.scala:98) |  
| at io.qbeast.use.managed.index.ContinuousExecution.optimize(ContinuousExecution.scala:29) |  
| at io.qbeast.use.managed.index.ContinuousExecution.optimize$(ContinuousExecution.scala:26) |  
| at io.qbeast.use.managed.index.LeveledCompaction.optimize(LeveledCompaction.scala:31) |  
| at io.qbeast.use.catalog.commands.LeveledCompactionTableCommand.run(LeveledCompactionTableCommand.scala:82) |  
| at io.qbeast.use.streaming.optimization.OptimizationRunnable.runOnce(OptimizationRunnable.scala:64) |  
| at io.qbeast.use.streaming.optimization.OptimizationRunnable.run(OptimizationRunnable.scala:95) |  
| at io.qbeast.use.streaming.optimization.OptimizationThread.run(OptimizationThread.scala:23)
JosepSampe commented 1 month ago

I've found where the ConcurrentAppendException problem is. There is a flag in Delta's AddFile called dataChange, which can be true or false.

In the new version, both Table.write() and Table.optimize() call RollupDataWriter.write(), which returns AddFile(data_change=true). In the old version, Table.write() called RollupDataWriter.write(), which returned AddFile(data_change=true), while Table.optimize() called RollupDataWriter.optimize(), which returned AddFile(data_change=false). Therefore, in the current code, since this flag is always set to true, the exception is triggered. I have performed several tests, keeping the flags as before: true for write and false for optimize, and I haven't encountered the exception again.