Closed MrAladdin closed 2 months ago
After the program restarts, deltacommits that have been in the REQUESTED state for a long time are automatically cleared, and the metadata table has finally triggered compaction.
Does it have an automatic mechanism that allows it to automatically fix itself without needing to restart the program?
@MrAladdin Not sure why you got error first place lakehouse/social/dwd_social_kbi_beauty_v1_hudi15/.hoodie/metadata/.hoodie/20240718210231598.deltacommit.inflight
When program restarts, it would have run the rollback which also delete the pending instant files. So it would have cleared the REQUESTED file.
This is really strange. Once an anomaly appears on this metadata table, it keeps recurring. After each restart, it reappears after running for a period of time. However, another table with the same configuration has not had this issue at all.
Comparing this program with the timelines of several other programs, it was found that before the metadata table rollback of this abnormal program, there was a Hudi table compaction.
org.apache.hudi.exception.HoodieException: Failed to update metadata at org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:262) at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:294) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:239) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:108) at org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1082) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141) at scala.util.Try$.apply(Try.scala:210) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211) Caused by: java.lang.IllegalArgumentException: File /xxx/lakehouse/social/dwd_social_kbi_beauty_v1_hudi15/.hoodie/metadata/.hoodie/20240720015925112.deltacommit.inflight does not exist! at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:617) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:598) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:223) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.commit(BaseSparkCommitActionExecutor.java:309) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:202) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:184) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:276) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:180) at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88) at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:171) at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:67) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1161) at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:120) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:863) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:918) at org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:257)
@ad1happy2go I found the reason, it's a resource issue that I provided. The solution is to set compac to synchronous, or increase the resources for Spark submit, or To reduce memory consumption caused by write amplification, decrease the value of hoodie.merge.small.file.group.candidates.limit, or decrease the size of hoodie.metadata.max.reader.buffer.size. I am very fortunate that this was the last exception encountered after researching all the configuration parameters of Hudi. Thank you for your continued attention, and a big thumbs up to you! All tests are currently completed. We will conduct further tests when new versions and features are released.
Thanks @MrAladdin . Closing the issue then.
Describe the problem you faced
Expected behavior
Environment Description
Hudi version : 0.15.0
Spark version : 3.5
Hive version : 3.1.2
Hadoop version : 3.1.3
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : no
Additional context
.writeStream .format("hudi") .option("hoodie.table.base.file.format", "PARQUET") .option("hoodie.allow.empty.commit", "true") .option("hoodie.datasource.write.drop.partition.columns", "false") .option("hoodie.table.services.enabled", "true") .option("hoodie.datasource.write.streaming.checkpoint.identifier", "lakehouse-dwd-social-kbi-beauty-v1-writer-1") .option(PRECOMBINE_FIELD.key(), "date_kbiUdate") .option(RECORDKEY_FIELD.key(), "records_key") .option(PARTITIONPATH_FIELD.key(), "partition_index_date") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option("hoodie.combine.before.upsert", "true") .option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") .option("hoodie.write.markers.type", "DIRECT") .option("hoodie.embed.timeline.server", "true") .option("hoodie.embed.timeline.server.async", "false") .option("hoodie.embed.timeline.server.gzip", "true") .option("hoodie.embed.timeline.server.reuse.enabled", "false") .option("hoodie.filesystem.view.incr.timeline.sync.enable", "false") .option("hoodie.client.heartbeat.interval_in_ms", "60000") .option("hoodie.client.heartbeat.tolerable.misses", "10") .option("hoodie.write.num.retries.on.conflict.failures", "6")
.option("hoodie.filesystem.view.remote.retry.enable", "true") .option("hoodie.filesystem.view.remote.timeout.secs", "1200") .option("hoodie.filesystem.view.remote.retry.initial_interval_ms", "500") .option("hoodie.filesystem.view.remote.retry.max_numbers", "8") .option("hoodie.filesystem.view.remote.retry.max_interval_ms", "8000") .option("hoodie.filesystem.operation.retry.enable","true")
.option("hoodie.filesystem.operation.retry.max_interval_ms", "8000")
.option("hoodie.filesystem.operation.retry.max_numbers", "8") .option("hoodie.filesystem.operation.retry.initial_interval_ms", "500") .option("hoodie.schema.cache.enable", "true") .option("hoodie.datasource.write.streaming.ignore.failed.batch", "false") .option("hoodie.datasource.write.streaming.retry.count", "6") .option("hoodie.datasource.write.streaming.retry.interval.ms", "3000") .option("hoodie.metadata.enable", "true") .option("hoodie.metadata.index.async", "false") .option("hoodie.metadata.index.check.timeout.seconds", "900") .option("hoodie.auto.adjust.lock.configs", "true") .option("hoodie.metadata.optimized.log.blocks.scan.enable", "true") .option("hoodie.metadata.metrics.enable", "false") .option("hoodie.metadata.compact.max.delta.commits", "20") .option("hoodie.metadata.max.reader.memory", "3221225472") .option("hoodie.metadata.max.reader.buffer.size", "1073741824") .option("hoodie.metadata.record.index.enable", "true") .option("hoodie.index.type", "RECORD_INDEX") .option("hoodie.record.index.use.caching", "true") .option("hoodie.record.index.input.storage.level", "MEMORY_AND_DISK_SER") .option("hoodie.metadata.max.init.parallelism", "100000") .option("hoodie.metadata.record.index.min.filegroup.count", "720") .option("hoodie.metadata.record.index.max.filegroup.count", "10000") .option("hoodie.metadata.record.index.max.filegroup.size", "1073741824") .option("hoodie.metadata.auto.initialize", "true") .option("hoodie.metadata.record.index.growth.factor", "2.0") .option("hoodie.metadata.max.logfile.size", "2147483648") .option("hoodie.metadata.max.deltacommits.when.pending", "1000") .option("hoodie.parquet.field_id.write.enabled", "true") .option("hoodie.copyonwrite.insert.auto.split", "true") .option("hoodie.record.size.estimation.threshold", "1.0") .option("hoodie.parquet.block.size", "536870912") .option("hoodie.parquet.max.file.size", "536870912") .option("hoodie.parquet.small.file.limit", "209715200") .option("hoodie.logfile.max.size", "536870912") .option("hoodie.logfile.data.block.max.size", "536870912") .option("hoodie.logfile.to.parquet.compression.ratio", "0.35") .option("hoodie.merge.small.file.group.candidates.limit", "10")
.option("hoodie.keep.max.commits", "100") .option("hoodie.keep.min.commits", "80") .option("hoodie.commits.archival.batch", "20") .option("hoodie.archive.automatic", "true") .option("hoodie.archive.async", "true") .option("hoodie.archive.beyond.savepoint", "true") .option("hoodie.fail.on.timeline.archiving", "true") .option("hoodie.archive.merge.enable", "true") .option("hoodie.archive.merge.files.batch.size", "10") .option("hoodie.archive.merge.small.file.limit.bytes", "20971520") .option("hoodie.clean.allow.multiple", "true") .option("hoodie.cleaner.incremental.mode", "true") .option("hoodie.clean.async", "true") .option("hoodie.cleaner.policy.failed.writes", "LAZY") .option("hoodie.cleaner.delete.bootstrap.base.file", "false") .option("hoodie.clean.automatic", "true") .option("hoodie.cleaner.policy", "KEEP_LATEST_BY_HOURS") .option("hoodie.cleaner.hours.retained", "6") .option("hoodie.clean.trigger.strategy", "NUM_COMMITS") .option("hoodie.clean.max.commits", "10") .option("hoodie.datasource.compaction.async.enable", "true") .option("hoodie.compact.inline", "false") .option("hoodie.compact.schedule.inline", "false") .option("hoodie.compaction.lazy.block.read", "true") .option("hoodie.compaction.reverse.log.read", "false") .option("hoodie.compaction.logfile.size.threshold", "314572800") .option("hoodie.compaction.target.io", compact_limit) .option("hoodie.compaction.strategy", "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy") .option("hoodie.compact.inline.trigger.strategy", "NUM_AND_TIME") .option("hoodie.compact.inline.max.delta.commits", "10") .option("hoodie.compact.inline.max.delta.seconds", "7200") .option("hoodie.memory.compaction.fraction", "0.6") .option("hoodie.datasource.write.reconcile.schema", "true") .option("hoodie.schema.on.read.enable", "false") .option("hoodie.write.set.null.for.missing.columns", "true") .option("hoodie.datasource.write.schema.allow.auto.evolution.column.drop", "false") .option("hoodie.avro.schema.external.transformation","true") .option("hoodie.avro.schema.validate", "true") .option("hoodie.write.concurrency.mode", "OPTIMISTIC_CONCURRENCY_CONTROL") .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider") .option("hoodie.write.lock.filesystem.expire", "2") .option("hoodie.write.lock.wait_time_ms", "300000") .option("hoodie.write.lock.num_retries", "15")
Stacktrace
24/07/18 21:18:49 WARN DAGScheduler: Broadcasting large task binary with size 1971.0 KiB 24/07/18 21:20:31 WARN FileSystemBasedLockProvider: Delete expired lock file: /xxx/lakehouse/social/dwd_social_kbi_beauty_v1_hudi15/.hoodie/lock 24/07/18 21:20:32 WARN BaseRollbackHelper: Rollback never failed and hence no marker dir was found. Safely moving on 24/07/18 21:20:54 WARN BaseHoodieCompactionPlanGenerator: After filtering, Nothing to compact for /xxx/lakehouse/social/dwd_social_kbi_beauty_v1_hudi15 24/07/18 21:21:34 ERROR HoodieBackedTableMetadataWriter: Exception in running table services on metadata table org.apache.hudi.exception.HoodieRollbackException: Failed to rollback /xxx/lakehouse/social/dwd_social_kbi_beauty_v1_hudi15/.hoodie/metadata commits 20240718210231598 at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1061) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1008) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:935) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:917) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedIndexingCommits(BaseHoodieTableServiceClient.java:884) at org.apache.hudi.client.BaseHoodieWriteClient.lazyRollbackFailedIndexing(BaseHoodieWriteClient.java:1401) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.cleanIfNecessary(HoodieBackedTableMetadataWriter.java:1349) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1263) at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:323) at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:288) at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1244) at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1284) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:154) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225) at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141) at scala.util.Try$.apply(Try.scala:210) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211) Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete instant [==>20240718210231598deltacommitINFLIGHT] with path /xxx/lakehouse/social/dwd_social_kbi_beauty_v1_hudi15/.hoodie/metadata/.hoodie/20240718210231598.deltacommit.inflight at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:300) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:242) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:292) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:262) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:117) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:141) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.rollback(HoodieSparkMergeOnReadTable.java:218) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1044) ... 49 more 24/07/18 21:21:34 ERROR HoodieStreamingSink: Micro batch id=15 threw following exception: org.apache.hudi.exception.HoodieException: Failed to instantiate Metadata table at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:326) at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:288) at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1244) at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1284) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:154) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225) at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141) at scala.util.Try$.apply(Try.scala:210) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211) Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback /xxx/lakehouse/social/dwd_social_kbi_beauty_v1_hudi15/.hoodie/metadata commits 20240718210231598 at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1061) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1008) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:935) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:917) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedIndexingCommits(BaseHoodieTableServiceClient.java:884) at org.apache.hudi.client.BaseHoodieWriteClient.lazyRollbackFailedIndexing(BaseHoodieWriteClient.java:1401) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.cleanIfNecessary(HoodieBackedTableMetadataWriter.java:1349) at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1263) at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:323) ... 41 more Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete instant [==>20240718210231598deltacommitINFLIGHT] with path /xxx/lakehouse/social/dwd_social_kbi_beauty_v1_hudi15/.hoodie/metadata/.hoodie/20240718210231598.deltacommit.inflight at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:300) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:242) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:292) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:262) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:117) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:141) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.rollback(HoodieSparkMergeOnReadTable.java:218) at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1044) ... 49 more 24/07/18 21:24:01 WARN DAGScheduler: Broadcasting large task binary with size 1027.1 KiB