apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT]After restarting the program .hoodie/metadata/record_index is completely deleted #11567

Open MrAladdin opened 2 months ago

MrAladdin commented 2 months ago

Describe the problem you faced

A clear and concise description of the problem.

  1. After restarting the program, .hoodie/metadata/record_index is completely deleted
  2. Due to the large amount of data already written into the lake, it is automatically deleted and rebuilt upon restart. The massive data volume causes a large number of task failures during this reconstruction, ultimately leading to failure. Regardless of the amount of resources allocated, the final result is still a significant number of task failures, causing the entire task to fail.
  3. There is an urgent need to know the cause and how to resolve it.

Environment Description

Hudi

.writeStream .format("hudi") .option("hoodie.table.base.file.format", "PARQUET") .option("hoodie.allow.empty.commit", "true") .option("hoodie.datasource.write.drop.partition.columns", "false") .option("hoodie.table.services.enabled", "true") .option("hoodie.datasource.write.streaming.checkpoint.identifier", "lakehouse-dwd-social-kbi-beauty-v1-writer-1") .option(PRECOMBINE_FIELD.key(), "date_kbiUdate") .option(RECORDKEY_FIELD.key(), "records_key") .option(PARTITIONPATH_FIELD.key(), "partition_index_date") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option("hoodie.combine.before.upsert", "true") .option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") .option("hoodie.write.markers.type", "DIRECT") .option("hoodie.embed.timeline.server", "true") .option("hoodie.embed.timeline.server.async", "false") .option("hoodie.embed.timeline.server.gzip", "true") .option("hoodie.embed.timeline.server.reuse.enabled", "false") .option("hoodie.filesystem.view.incr.timeline.sync.enable", "false") .option("hoodie.client.heartbeat.interval_in_ms", "60000") .option("hoodie.client.heartbeat.tolerable.misses", "10") .option("hoodie.write.num.retries.on.conflict.failures", "6")
.option("hoodie.filesystem.view.remote.retry.enable", "true") .option("hoodie.filesystem.view.remote.timeout.secs", "1200") .option("hoodie.filesystem.view.remote.retry.initial_interval_ms", "500") .option("hoodie.filesystem.view.remote.retry.max_numbers", "8") .option("hoodie.filesystem.view.remote.retry.max_interval_ms", "8000") .option("hoodie.filesystem.operation.retry.enable","true")
.option("hoodie.filesystem.operation.retry.max_interval_ms", "8000")
.option("hoodie.filesystem.operation.retry.max_numbers", "8") .option("hoodie.filesystem.operation.retry.initial_interval_ms", "500") .option("hoodie.schema.cache.enable", "true") .option("hoodie.datasource.write.streaming.ignore.failed.batch", "false") .option("hoodie.datasource.write.streaming.retry.count", "6") .option("hoodie.datasource.write.streaming.retry.interval.ms", "3000") .option("hoodie.metadata.enable", "true") .option("hoodie.metadata.index.async", "false") .option("hoodie.metadata.index.check.timeout.seconds", "900") .option("hoodie.auto.adjust.lock.configs", "true") .option("hoodie.metadata.optimized.log.blocks.scan.enable", "true") .option("hoodie.metadata.metrics.enable", "false") .option("hoodie.metadata.compact.max.delta.commits", "20") .option("hoodie.metadata.max.reader.memory", "3221225472") .option("hoodie.metadata.max.reader.buffer.size", "1073741824") .option("hoodie.metadata.record.index.enable", "true") .option("hoodie.index.type", "RECORD_INDEX") .option("hoodie.record.index.use.caching", "true") .option("hoodie.record.index.input.storage.level", "MEMORY_AND_DISK_SER") .option("hoodie.metadata.max.init.parallelism", "100000") .option("hoodie.metadata.record.index.min.filegroup.count", "720") .option("hoodie.metadata.record.index.max.filegroup.count", "10000") .option("hoodie.metadata.record.index.max.filegroup.size", "1073741824") .option("hoodie.metadata.auto.initialize", "true") .option("hoodie.metadata.record.index.growth.factor", "2.0") .option("hoodie.metadata.max.logfile.size", "2147483648") .option("hoodie.metadata.max.deltacommits.when.pending", "1000") .option("hoodie.parquet.field_id.write.enabled", "true") .option("hoodie.copyonwrite.insert.auto.split", "true") .option("hoodie.record.size.estimation.threshold", "1.0") .option("hoodie.parquet.block.size", "536870912") .option("hoodie.parquet.max.file.size", "536870912") .option("hoodie.parquet.small.file.limit", "209715200") .option("hoodie.logfile.max.size", "536870912") .option("hoodie.logfile.data.block.max.size", "536870912") .option("hoodie.logfile.to.parquet.compression.ratio", "0.35") .option("hoodie.merge.small.file.group.candidates.limit", "10")
.option("hoodie.keep.max.commits", "100") .option("hoodie.keep.min.commits", "80") .option("hoodie.commits.archival.batch", "20") .option("hoodie.archive.automatic", "true") .option("hoodie.archive.async", "true") .option("hoodie.archive.beyond.savepoint", "true") .option("hoodie.fail.on.timeline.archiving", "true") .option("hoodie.archive.merge.enable", "true") .option("hoodie.archive.merge.files.batch.size", "10") .option("hoodie.archive.merge.small.file.limit.bytes", "20971520") .option("hoodie.clean.allow.multiple", "true") .option("hoodie.cleaner.incremental.mode", "true") .option("hoodie.clean.async", "true") .option("hoodie.cleaner.policy.failed.writes", "LAZY") .option("hoodie.cleaner.delete.bootstrap.base.file", "false") .option("hoodie.clean.automatic", "true") .option("hoodie.cleaner.policy", "KEEP_LATEST_BY_HOURS") .option("hoodie.cleaner.hours.retained", "6") .option("hoodie.clean.trigger.strategy", "NUM_COMMITS") .option("hoodie.clean.max.commits", "10") .option("hoodie.datasource.compaction.async.enable", "true") .option("hoodie.compact.inline", "false") .option("hoodie.compact.schedule.inline", "false") .option("hoodie.compaction.lazy.block.read", "true") .option("hoodie.compaction.reverse.log.read", "false") .option("hoodie.compaction.logfile.size.threshold", "314572800") .option("hoodie.compaction.target.io", compact_limit) .option("hoodie.compaction.strategy", "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy") .option("hoodie.compact.inline.trigger.strategy", "NUM_AND_TIME") .option("hoodie.compact.inline.max.delta.commits", "10") .option("hoodie.compact.inline.max.delta.seconds", "7200") .option("hoodie.memory.compaction.fraction", "0.6") .option("hoodie.datasource.write.reconcile.schema", "true") .option("hoodie.schema.on.read.enable", "false") .option("hoodie.write.set.null.for.missing.columns", "true") .option("hoodie.datasource.write.schema.allow.auto.evolution.column.drop", "false") .option("hoodie.avro.schema.external.transformation","true") .option("hoodie.avro.schema.validate", "true") .option("hoodie.write.concurrency.mode", "OPTIMISTIC_CONCURRENCY_CONTROL") .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider") .option("hoodie.write.lock.filesystem.expire", "2") .option("hoodie.write.lock.wait_time_ms", "300000") .option("hoodie.write.lock.num_retries", "15")

Thanks

ad1happy2go commented 2 months ago

@MrAladdin Did you disabled the record index and restarted the driver? I see you are using lock provider, Are multiple writers writing to this table? Are the writer config same for all the writers?

MrAladdin commented 2 months ago

@MrAladdin Did you disabled the record index and restarted the driver? I see you are using lock provider, Are multiple writers writing to this table? Are the writer config same for all the writers?

  1. No disablement.
  2. The lock has been enabled, but the use of multiple programs writing simultaneously has not been implemented to prevent future demands. I found that when two identical programs are mistakenly started at the same time, this situation occurs, rather than a direct conflict and exit. When not using the lock, it can be turned off, preventing this issue of deleting the record index caused by the same program being mistakenly started simultaneously.
  3. I also want to know how to fix this problem when it occurs. Could you please provide detailed steps for a quick repair?
MrAladdin commented 2 months ago

@MrAladdin Did you disabled the record index and restarted the driver? I see you are using lock provider, Are multiple writers writing to this table? Are the writer config same for all the writers?

I understand that it is unreasonable to delete the record index due to this conflict. In case of a conflict, it should rollback rather than directly clearing and rebuilding. When the data volume is massive, it may be impossible to rebuild due to resource limitations in the cluster.

ad1happy2go commented 2 months ago

@MrAladdin I am not very clear what part of code will delete the record index directory itself. Can you provide me steps to reproduce this issue?

danny0405 commented 2 months ago

@MrAladdin @ad1happy2go When the RLI is disabled, the dir would be purged. So we might need to find the culprit why the RLI is disabled?

MrAladdin commented 2 months ago

@MrAladdin @ad1happy2go When the RLI is disabled, the dir would be purged. So we might need to find the culprit why the RLI is disabled?

The scenario where the write program automatically deletes the entire metadata/record_index directory has been identified as: This situation occurs when the write program is simultaneously started twice. It has happened a few times before, and due to the small amount of data, it could be directly rewritten to fix it. However, when the data volume is large, it becomes quite awkward. At present, no official tool has been found to reconstruct the record index under a large data volume.

danny0405 commented 2 months ago

This situation occurs when the write program is simultaneously started twice.

Are you saying the OCC triggers and one program roll backs the other.

MrAladdin commented 2 months ago

This situation occurs when the write program is simultaneously started twice.

Are you saying the OCC triggers and one program roll backs the other.

The specific reason is unclear, but this situation indeed occurs when the same program is started simultaneously or sequentially, meaning two identical write programs are submitted and run.

MrAladdin commented 2 months ago

This situation occurs when the write program is simultaneously started twice.

Are you saying the OCC triggers and one program roll backs the other.

image In the screenshot, a REQUESTED deltacommit was performed at 17:46 on 07-04, but a rollback was performed immediately at 17:47 on 07-04. It seems to be as you described. However, I hope my screenshot and inference do not affect your judgment. If this is indeed the situation you mentioned, is there a way to avoid it at the code level?

danny0405 commented 2 months ago

Did you config the lock provider specifically for the multi-writer scenarios? If one writer enables the MDT while the other disables it, this could happen(delete the whole ./hoodie/metadata dir).

MrAladdin commented 2 months ago

Did you config the lock provider specifically for the multi-writer scenarios? If one writer enables the MDT while the other disables it, this could happen(delete the whole ./hoodie/metadata dir).

Both have enabled MDT, and the exact same program was submitted twice.

danny0405 commented 2 months ago

Hmm, this is unexpected, @nsivabalan can you chim in and take a look?

MrAladdin commented 1 month ago

@danny0405 When using the hudi-cli to view the timeline, it was discovered that 160 instants had not been archived, and the timeline of the metadata table had not been compacted. Among these early instants, there was a deltacommit that remained in the REQUESTED state. Subsequently, an archival operation was performed on the Hudi table using the hudi-cli, and a rollback was executed for the deltacommit that was stuck in the REQUESTED state. However, it was found that the running program failed due to insufficient resources. Upon further investigation, it was discovered that the index already constructed under .hoodie/metadata/record_index had been mysteriously deleted, which is really frustrating.

24/07/18 10:31:05 ERROR TaskSetManager: Task 178 in stage 5897.0 failed 4 times; aborting job 24/07/18 10:31:05 WARN TaskSetManager: Lost task 101.1 in stage 5897.0 (TID 761607) (104.slave.hdp executor 40): ExecutorLostFailure (executor 40 exited caused by one of the running tasks) Reason: Container from a bad node: container_e89_1694400023425_2167845_01_000059 on host: 104.slave.hdp. Exit status: 143. Diagnostics: [2024-07-18 10:31:03.550]Container killed on request. Exit code is 143 [2024-07-18 10:31:03.551]Container exited with a non-zero exit code 143. [2024-07-18 10:31:03.551]Killed by external signal . 24/07/18 10:31:05 ERROR HoodieBackedTableMetadataWriter: Bootstrap on record_index partition failed for xxx/.hoodie/metadata org.apache.spark.SparkException: Job aborted due to stage failure: Task 178 in stage 5897.0 failed 4 times, most recent failure: Lost task 178.3 in stage 5897.0 (TID 761598) (104.slave.hdp executor 40): ExecutorLostFailure (executor 40 exited caused by one of the running tasks) Reason: Container from a bad node: container_e89_1694400023425_2167845_01_000059 on host: 104.slave.hdp. Exit status: 143. Diagnostics: [2024-07-18 10:31:03.550]Container killed on request. Exit code is 143 [2024-07-18 10:31:03.551]Container exited with a non-zero exit code 143. [2024-07-18 10:31:03.551]Killed by external signal . Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.immutable.List.foreach(List.scala:333) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) at scala.Option.foreach(Option.scala:437) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463) at org.apache.spark.rdd.RDD.count(RDD.scala:1296) at org.apache.spark.api.java.JavaRDDLike.count(JavaRDDLike.scala:469) at org.apache.spark.api.java.JavaRDDLike.count$(JavaRDDLike.scala:469) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) at org.apache.hudi.data.HoodieJavaRDD.count(HoodieJavaRDD.java:115) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeRecordIndexPartition(HoodieBackedTableMetadataWriter.java:541) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:411) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:275) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.(HoodieBackedTableMetadataWriter.java:175) at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.(SparkHoodieBackedTableMetadataWriter.java:98) at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:75) at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:320) at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:288) at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1244) at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1284) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:154) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225) at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141) at scala.util.Try$.apply(Try.scala:210) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:289) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211) 24/07/18 10:31:05 ERROR HoodieStreamingSink: Micro batch id=163 threw following exception: org.apache.hudi.exception.HoodieException: Failed to instantiate Metadata table at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:326) at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:288) at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1244) at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1284) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:154) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225) at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141) at scala.util.Try$.apply(Try.scala:210) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:289) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211) Caused by: org.apache.hudi.exception.HoodieMetadataException: Bootstrap on record_index partition failed for xxx/.hoodie/metadata at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:422) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:275) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.(HoodieBackedTableMetadataWriter.java:175) at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.(SparkHoodieBackedTableMetadataWriter.java:98) at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:75) at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:320) ... 41 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 178 in stage 5897.0 failed 4 times, most recent failure: Lost task 178.3 in stage 5897.0 (TID 761598) (104.slave.hdp executor 40): ExecutorLostFailure (executor 40 exited caused by one of the running tasks) Reason: Container from a bad node: container_e89_1694400023425_2167845_01_000059 on host: 104.slave.hdp. Exit status: 143. Diagnostics: [2024-07-18 10:31:03.550]Container killed on request. Exit code is 143 [2024-07-18 10:31:03.551]Container exited with a non-zero exit code 143. [2024-07-18 10:31:03.551]Killed by external signal . Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.immutable.List.foreach(List.scala:333) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) at scala.Option.foreach(Option.scala:437) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463) at org.apache.spark.rdd.RDD.count(RDD.scala:1296) at org.apache.spark.api.java.JavaRDDLike.count(JavaRDDLike.scala:469) at org.apache.spark.api.java.JavaRDDLike.count$(JavaRDDLike.scala:469) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) at org.apache.hudi.data.HoodieJavaRDD.count(HoodieJavaRDD.java:115) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeRecordIndexPartition(HoodieBackedTableMetadataWriter.java:541) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:411) ... 46 more

MrAladdin commented 1 month ago

image image

ad1happy2go commented 1 month ago

@MrAladdin I tried to reproduce this but couldn't able to. If we use same configuration with RLI enabled, it never deletes the record_index directory. Are you able to reproduce it consistently?

MrAladdin commented 1 month ago

@ad1happy2go If you used the same Hudi write configuration and multiple programs were started simultaneously without reproducing the success, I guess it might be due to the Spark-related configuration parameters in Spark submit. The most likely cause is spark.speculation, which leads to metadata/record_index exceptions. Currently, no similar exceptions have been observed when starting programs simultaneously with spark.speculation turned off or using hudi-cli for table service operations.