apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Metadata Column Stats Index failing with Merge on Read table. #7125

Open shubhambg95 opened 1 year ago

shubhambg95 commented 1 year ago

Describe the problem you faced

We currently have a spark streaming job that writes data from a Kafka topic to a Hudi MOR table. We want to enable column stats indexing to leverage data skipping while reading from this table.

To do this we basically set the hoodie.metadata.index.column.stats.enable = true. When we enable this change we are seeing some issue with the Hudi cleaner. We are running into clean operations that never finish and remain at clean.inflight stage.

This is also causing an issue with the MDT tasks which fail with java.lang.StringIndexOutOfBoundsException at getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)

When we run without setting hoodie.metadata.index.column.stats.enable = true, we are not seeing any issues.

Hudi Configurations

hoodie.datasource.write.table.type: "MERGE_ON_READ"
hoodie.datasource.write.precombine.field: preCombineField
hoodie.datasource.write.recordkey.field: recordKeyField
hoodie.datasource.write.partitionpath.field: partitionPathField
hoodie.table.name: tableName
hoodie.index.type: "SIMPLE"
hoodie.datasource.write.operation: "INSERT"
hoodie.datasource.write.hive_style_partitioning: true
hoodie.insert.shuffle.parallelism: 54
hoodie.finalize.write.parallelism: 54
hoodie.cleaner.commits.retained: 300
hoodie.keep.min.commits: 325
hoodie.keep.max.commits: 350
hoodie.parquet.compression.codec: "snappy"
hoodie.metadata.enable: true
hoodie.metadata.index.column.stats.enable: true

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace


22/11/02 01:17:27 WARN TaskSetManager: Lost task 3.0 in stage 6283.0 (TID 89475) (241.16.4.223 executor 54): java.lang.StringIndexOutOfBoundsException: String index out of range: -10
    at java.lang.String.substring(String.java:1931)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:636)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$b912a0b7$1(HoodieTableMetadataUtil.java:572)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:122)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

22/11/02 01:17:27 ERROR TaskSetManager: Task 3 in stage 6283.0 failed 4 times; aborting job
22/11/02 01:17:27 WARN CleanActionExecutor: Failed to perform previous clean operation, instant: [==>20221102011713963__clean__REQUESTED]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 6283.0 failed 4 times, most recent failure: Lost task 3.3 in stage 6283.0 (TID 89491) (241.16.6.253 executor 53): java.lang.StringIndexOutOfBoundsException: String index out of range: -10
    at java.lang.String.substring(String.java:1931)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:636)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$b912a0b7$1(HoodieTableMetadataUtil.java:572)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:122)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:187)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:156)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:45)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:112)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:76)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:172)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:166)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:813)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:892)
    at org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$1(BaseActionExecutor.java:69)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:69)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runClean(CleanActionExecutor.java:224)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runPendingClean(CleanActionExecutor.java:189)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.lambda$execute$8(CleanActionExecutor.java:259)
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.execute(CleanActionExecutor.java:253)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.clean(HoodieSparkCopyOnWriteTable.java:274)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:871)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:839)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:893)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:614)
    at org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:533)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:237)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:714)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:340)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -10
    at java.lang.String.substring(String.java:1931)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:636)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$b912a0b7$1(HoodieTableMetadataUtil.java:572)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:122)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
22/11/02 01:18:07 WARN HoodieHeartbeatClient: Heartbeat expired, currentTime = 1667351887919, last heartbeat = Heartbeat{instantTime='20221102011713963', isHeartbeatStarted=false, isHeartbeatStopped=false, lastHeartbeatTime=0, numHeartbeats=0, timer=java.util.Timer@2ba6f040}, heartbeat interval = 60000
22/11/02 01:18:10 ERROR HeartbeatUtils: Failed to delete heartbeat for instant 20221102011713963
22/11/02 01:18:25 WARN TaskSetManager: Lost task 8.0 in stage 6314.0 (TID 90507) (241.16.4.210 executor 20): java.lang.StringIndexOutOfBoundsException: String index out of range: -13
    at java.lang.String.substring(String.java:1931)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:636)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$b912a0b7$1(HoodieTableMetadataUtil.java:572)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:122)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

22/11/02 01:18:25 WARN TaskSetManager: Lost task 3.0 in stage 6314.0 (TID 90502) (241.16.6.250 executor 47): java.lang.StringIndexOutOfBoundsException: String index out of range: -10
    at java.lang.String.substring(String.java:1931)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:636)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$b912a0b7$1(HoodieTableMetadataUtil.java:572)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:122)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

22/11/02 01:18:25 WARN TaskSetManager: Lost task 7.0 in stage 6314.0 (TID 90506) (241.16.4.220 executor 51): java.lang.StringIndexOutOfBoundsException: String index out of range: -5
    at java.lang.String.substring(String.java:1931)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:636)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$b912a0b7$1(HoodieTableMetadataUtil.java:572)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:122)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

22/11/02 01:18:25 ERROR TaskSetManager: Task 3 in stage 6314.0 failed 4 times; aborting job
22/11/02 01:18:25 WARN TaskSetManager: Lost task 4.3 in stage 6314.0 (TID 90519) (241.16.4.221 executor 50): TaskKilled (Stage cancelled)
22/11/02 01:18:25 WARN CleanActionExecutor: Failed to perform previous clean operation, instant: [==>20221102011713963__clean__INFLIGHT]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 6314.0 failed 4 times, most recent failure: Lost task 3.3 in stage 6314.0 (TID 90517) (241.16.4.224 executor 55): java.lang.StringIndexOutOfBoundsException: String index out of range: -10
    at java.lang.String.substring(String.java:1931)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:636)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1158)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$b912a0b7$1(HoodieTableMetadataUtil.java:572)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:122)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:187)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:156)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:45)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:112)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:76)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:172)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:166)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:813)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:892)
    at org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$1(BaseActionExecutor.java:69)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:69)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runClean(CleanActionExecutor.java:224)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runPendingClean(CleanActionExecutor.java:189)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.lambda$execute$8(CleanActionExecutor.java:259)
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.execute(CleanActionExecutor.java:253)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.clean(HoodieSparkCopyOnWriteTable.java:274)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:871)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:839)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:893)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:614)
    at org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:533)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:237)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:714)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:340)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)```
shubhambg95 commented 1 year ago

Attaching the table and metadata table timeline here metadata_timeline.txt hoodie_timeline.txt

codope commented 1 year ago

I think this issue is not directly related to metadata table. From the timeline and stacktrace, here's what we know:

  1. After about 302 deltacommits, a clean was triggered => 20221102011713963.clean.requested
  2. This clean never completed because the metadata table column stats update failed for the corresponding clean metadata. In the metadata timeline, we only see 20221102011713963.deltacommit.requested.
  3. the failure in metadata table is due to StringIndexOutOfBoundsException: String index out of range: -10 from this code. This can happen when file path string is shorter than the partition name itself which is very unlikely.

@shubhambg95 For further investigation, please provide the contents of 20221102011713963.clean.requested file in the Hudi timeline.

shubhambg95 commented 1 year ago

Hi @codope I cannot share the file here directly. Is there anything particular you are looking for in the 20221102011713963.clean.requested file that I can check and get back to you.

codope commented 1 year ago

Actually, a zip of the clean.requested and deltacommit files around that time would help understand better but we can start with looking at filePathsToBeDeletedPerPartition and earliestInstantToRetain fields in 20221102011713963.clean.requested. Also, would it help if i provide a patch with more logs?

shubhambg95 commented 1 year ago

I pinged you on slack.

codope commented 1 year ago

Synced up on call and we looked at the clean commit metadata. The issue does not seem to be related to metadata table. One thing to note is that the table is multi-level partitioned i.e. there are multiple partition fields and filePathsToBeDeletedPerPartition was empty for quite a few partitions. Suggested a workaround disable and re-enable metadata then the commits should go through. Followups:

  1. Requested to share the non-empty filePathsToBeDeletedPerPartition. OP will get back after checking with the team.
  2. We need to reproduce the issue with similar configs but multiple partition fields.
CoryCovinoMark43 commented 10 months ago

I am also encountering this issue. Are there any new updates to report?

duntonr commented 3 months ago

I am also seeing this issue with 0.15 on Spark 3.5.1 with Scala 2.12 and Java 17

24/07/08 00:22:06 ERROR CleanActionExecutor: Failed to perform previous clean operation, instant: [==>20240707213206316__clean__INFLIGHT__20240707213226000]
org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:93)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runClean(CleanActionExecutor.java:235)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runPendingClean(CleanActionExecutor.java:200)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.execute(CleanActionExecutor.java:271)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.clean(HoodieSparkCopyOnWriteTable.java:258)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:757)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:843)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:816)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:847)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:581)
    at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:560)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:251)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:108)
    at org.apache.hudi.utilities.streamer.StreamSync.writeToSinkAndDoMetaSync(StreamSync.java:894)
    at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:493)
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:793)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 71.0 failed 4 times, most recent failure: Lost task 5.3 in stage 71.0 (TID 2702) (spark-worker-0.service.rack01.consul.internal.therack.io executor 3): java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:197)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:168)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:171)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:67)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1161)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:120)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:863)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:960)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:88)
    ... 19 more
Caused by: java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    ... 3 more
24/07/08 00:22:06 ERROR HoodieStreamer: Shutting down delta-sync due to exception
org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:93)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runClean(CleanActionExecutor.java:235)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runPendingClean(CleanActionExecutor.java:200)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.execute(CleanActionExecutor.java:271)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.clean(HoodieSparkCopyOnWriteTable.java:258)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:757)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:843)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:816)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:847)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:581)
    at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:560)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:251)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:108)
    at org.apache.hudi.utilities.streamer.StreamSync.writeToSinkAndDoMetaSync(StreamSync.java:894)
    at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:493)
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:793)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 71.0 failed 4 times, most recent failure: Lost task 5.3 in stage 71.0 (TID 2702) (spark-worker-0.service.rack01.consul.internal.therack.io executor 3): java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:197)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:168)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:171)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:67)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1161)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:120)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:863)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:960)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:88)
    ... 19 more
Caused by: java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    ... 3 more
24/07/08 00:22:06 INFO HoodieStreamer: Delta Sync shutdown. Error ?true
24/07/08 00:22:06 WARN HoodieStreamer: Gracefully shutting down compactor
24/07/08 00:22:15 INFO AsyncCompactService: Compactor shutting down properly!!
24/07/08 00:22:15 WARN HoodieStreamer: Gracefully shutting down clustering service
24/07/08 00:22:15 INFO AsyncClusteringService: Clustering executor shutting down properly
24/07/08 00:22:15 INFO HoodieStreamer: Ingestion completed. Has error: true
24/07/08 00:22:15 INFO TransactionManager: Transaction manager closed
24/07/08 00:22:15 INFO TransactionManager: Transaction manager closed
24/07/08 00:22:15 INFO StreamSync: Shutting down embedded timeline server
24/07/08 00:22:15 INFO EmbeddedTimelineService: Closing Timeline server
24/07/08 00:22:15 INFO TimelineService: Closing Timeline Service
24/07/08 00:22:15 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
    at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:214)
    at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:606)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:832)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:93)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runClean(CleanActionExecutor.java:235)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runPendingClean(CleanActionExecutor.java:200)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.execute(CleanActionExecutor.java:271)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.clean(HoodieSparkCopyOnWriteTable.java:258)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:757)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:843)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:816)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:847)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:581)
    at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:560)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:251)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:108)
    at org.apache.hudi.utilities.streamer.StreamSync.writeToSinkAndDoMetaSync(StreamSync.java:894)
    at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:493)
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:793)
    ... 4 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 71.0 failed 4 times, most recent failure: Lost task 5.3 in stage 71.0 (TID 2702) (spark-worker-0.service.rack01.consul.internal.therack.io executor 3): java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:197)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:168)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:171)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:67)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1161)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:120)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:863)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:960)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:88)
    ... 19 more
Caused by: java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    ... 3 more
24/07/08 00:22:15 INFO Javalin: Stopping Javalin ...
24/07/08 00:22:15 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/07/08 00:22:15 ERROR Javalin: Javalin failed to stop gracefully
java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1081)
    at java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:276)
    at org.apache.hudi.org.apache.jetty.server.AbstractConnector.doStop(AbstractConnector.java:373)
    at org.apache.hudi.org.apache.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
    at org.apache.hudi.org.apache.jetty.server.ServerConnector.doStop(ServerConnector.java:246)
    at org.apache.hudi.org.apache.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94)
    at org.apache.hudi.org.apache.jetty.server.Server.doStop(Server.java:459)
    at org.apache.hudi.org.apache.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94)
    at io.javalin.Javalin.stop(Javalin.java:209)
    at org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:411)
    at org.apache.hudi.client.embedded.EmbeddedTimelineService.stopForBasePath(EmbeddedTimelineService.java:249)
    at org.apache.hudi.utilities.streamer.StreamSync.close(StreamSync.java:1272)
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.close(HoodieStreamer.java:962)
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.onIngestionCompletes(HoodieStreamer.java:950)
    at org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
24/07/08 00:22:15 INFO SparkUI: Stopped Spark web UI at http://spark-worker-6.service.rack01.consul.internal.therack.io:8090
24/07/08 00:22:15 INFO StandaloneSchedulerBackend: Shutting down all executors
24/07/08 00:22:15 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Asking each executor to shut down
24/07/08 00:22:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/07/08 00:22:15 INFO MemoryStore: MemoryStore cleared
24/07/08 00:22:15 INFO BlockManager: BlockManager stopped
24/07/08 00:22:15 INFO BlockManagerMaster: BlockManagerMaster stopped
24/07/08 00:22:15 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/07/08 00:22:15 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception.
    at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:67)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
    at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:214)
    at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:606)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:65)
    ... 9 more
Caused by: org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:832)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.hudi.exception.HoodieException: Failed to apply clean commit to metadata
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:93)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runClean(CleanActionExecutor.java:235)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.runPendingClean(CleanActionExecutor.java:200)
    at org.apache.hudi.table.action.clean.CleanActionExecutor.execute(CleanActionExecutor.java:271)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.clean(HoodieSparkCopyOnWriteTable.java:258)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:757)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:843)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:816)
    at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:847)
    at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:581)
    at org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:560)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:251)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:108)
    at org.apache.hudi.utilities.streamer.StreamSync.writeToSinkAndDoMetaSync(StreamSync.java:894)
    at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:493)
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.lambda$startService$1(HoodieStreamer.java:793)
    ... 4 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 71.0 failed 4 times, most recent failure: Lost task 5.3 in stage 71.0 (TID 2702) (spark-worker-0.service.rack01.consul.internal.therack.io executor 3): java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
    at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:197)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:168)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:171)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:67)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1161)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:120)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:863)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:960)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:88)
    ... 19 more
Caused by: java.lang.StringIndexOutOfBoundsException: begin 144, end 73, length 73
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4606)
    at java.base/java.lang.String.substring(String.java:2709)
    at java.base/java.lang.String.substring(String.java:2682)
    at org.apache.hudi.common.fs.FSUtils.getFileName(FSUtils.java:601)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsRecords(HoodieTableMetadataUtil.java:1154)
    at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$convertMetadataToColumnStatsRecords$cc8fcd81$1(HoodieTableMetadataUtil.java:702)
    at org.apache.hudi.data.HoodieJavaRDD.lambda$flatMap$a6598fcb$1(HoodieJavaRDD.java:137)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
    at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    ... 3 more
24/07/08 00:22:15 INFO ShutdownHookManager: Shutdown hook called
24/07/08 00:22:15 INFO ShutdownHookManager: Deleting directory /tmp/spark-4ddab59c-1ef0-496e-800e-5a818527e824
24/07/08 00:22:15 INFO ShutdownHookManager: Deleting directory /alloc/tmp/spark-401017c9-9572-487b-ad7a-d476f4fc4d7e
24/07/08 00:22:15 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
24/07/08 00:22:15 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
24/07/08 00:22:15 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.
duntonr commented 3 months ago

@codope - I can share anything needed to help troubleshoot this, its not company sensitive. Also wanted to note I am using a multiple partitions for the table