apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] spark stuctrued streaming failed to update MDT metadata #10891

Closed xicm closed 5 months ago

xicm commented 7 months ago

Tips before filing an issue

Describe the problem you faced

spark structured streaming ingest data with hoodie.metadata.enable=true, the async compaction will write a DELTACOMMIT instant to MDT, as the compaction is async, the data writer will rollback the inflight delta commit in MDT, when the compaction finish, the compaction writer will find the inflight deltacommit does not exit, throw an exception.

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace


24/03/20 09:51:23 INFO BaseRollbackActionExecutor: Rolled back inflight instant 20240320095114761
24/03/20 09:51:23 INFO BaseRollbackActionExecutor: Index rolled back for commits [==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
24/03/20 09:51:23 INFO BaseRollbackActionExecutor: Deleting instant=[==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
24/03/20 09:51:23 INFO HoodieActiveTimeline: Deleting instant [==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
24/03/20 09:51:24 INFO HoodieActiveTimeline: Removed instant [==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
24/03/20 09:51:24 INFO HoodieActiveTimeline: Deleting instant [==>20240320095114761__deltacommit__REQUESTED]
24/03/20 09:51:24 INFO HoodieSparkSqlWriterInternal: Config.inlineCompactionEnabled ? false
24/03/20 09:51:24 INFO HoodieSparkSqlWriterInternal: Config.asyncClusteringEnabled ? false
24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 570 from persistence list
24/03/20 09:51:24 INFO BlockManager: Removing RDD 570
24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 558 from persistence list
24/03/20 09:51:24 INFO BlockManager: Removing RDD 558
24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 562 from persistence list
24/03/20 09:51:24 INFO BlockManager: Removing RDD 562
24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 604 from persistence list
24/03/20 09:51:24 INFO BlockManager: Removing RDD 604
24/03/20 09:51:24 INFO UnionRDD: Removing RDD 581 from persistence list
24/03/20 09:51:24 INFO BlockManager: Removing RDD 581
24/03/20 09:51:24 ERROR HoodieStreamingSink: Micro batch id=10 threw following exception:
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback /tmp/hoodie/hudi40/.hoodie/metadata commits 20240320095114761
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1065)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1012)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:940)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:922)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:917)
    at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:941)
    at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:222)
    at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:940)
    at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:925)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1092)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:117)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:810)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:865)
    at org.apache.hudi.client.BaseHoodieWriteClient.writeTableMetadata(BaseHoodieWriteClient.java:363)
    at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:286)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
    at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1081)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:520)
    at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
    at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:138)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:130)
    at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:234)
    at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:129)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete instant [==>20240320095114761__deltacommit__REQUESTED] with path /tmp/hoodie/hudi40/.hoodie/metadata/.hoodie/20240320095114761.deltacommit.requested
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:301)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:243)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:294)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:259)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:117)
    at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:138)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.rollback(HoodieSparkMergeOnReadTable.java:218)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1048)
    ... 49 more
24/03/20 09:51:24 INFO HoodieStreamingSink: Retrying the failed micro batch id=10 ...
24/03/20 09:51:24 INFO BlockManagerInfo: Added rdd_604_0 in memory on host-10-19-29-165:40389 (size: 512.0 B, free: 365.9 MiB)
24/03/20 09:51:24 INFO TaskSetManager: Finished task 0.0 in stage 225.0 (TID 246) in 989 ms on host-10-19-29-165 (executor 2) (1/1)
24/03/20 09:51:24 INFO YarnScheduler: Removed TaskSet 225.0, whose tasks have all completed, from pool
24/03/20 09:51:24 INFO DAGScheduler: ResultStage 225 (start at <pastie>:64) finished in 1.218 s
24/03/20 09:51:24 INFO DAGScheduler: Job 138 is finished. Cancelling potential speculative or zombie tasks for this job
24/03/20 09:51:24 INFO YarnScheduler: Killing all running tasks in stage 225: Stage finished
24/03/20 09:51:24 INFO DAGScheduler: Job 138 finished: start at <pastie>:64, took 1.410154 s
24/03/20 09:51:24 INFO CommitUtils: Creating  metadata for UPSERT_PREPPED numWriteStats:1 numReplaceFileIds:0
24/03/20 09:51:24 INFO BaseSparkCommitActionExecutor: Committing 20240320095114761, action Type deltacommit, operation Type UPSERT_PREPPED
24/03/20 09:51:24 INFO SparkContext: Starting job: start at <pastie>:64
24/03/20 09:51:24 INFO DAGScheduler: Got job 141 (start at <pastie>:64) with 1 output partitions
24/03/20 09:51:24 INFO DAGScheduler: Final stage: ResultStage 229 (start at <pastie>:64)
24/03/20 09:51:24 INFO DAGScheduler: Parents of final stage: List()
24/03/20 09:51:24 INFO DAGScheduler: Missing parents: List()
24/03/20 09:51:24 INFO DAGScheduler: Submitting ResultStage 229 (MapPartitionsRDD[616] at start at <pastie>:64), which has no missing parents
24/03/20 09:51:24 INFO MemoryStore: Block broadcast_196 stored as values in memory (estimated size 154.3 KiB, free 364.3 MiB)
24/03/20 09:51:24 INFO MemoryStore: Block broadcast_196_piece0 stored as bytes in memory (estimated size 53.9 KiB, free 364.2 MiB)
24/03/20 09:51:24 INFO BlockManagerInfo: Added broadcast_196_piece0 in memory on host-10-19-29-153:57823 (size: 53.9 KiB, free: 365.8 MiB)
24/03/20 09:51:24 INFO SparkContext: Created broadcast 196 from broadcast at DAGScheduler.scala:1513
24/03/20 09:51:24 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 229 (MapPartitionsRDD[616] at start at <pastie>:64) (first 15 tasks are for partitions Vector(0))
24/03/20 09:51:24 INFO YarnScheduler: Adding task set 229.0 with 1 tasks resource profile 0
24/03/20 09:51:24 INFO TaskSetManager: Starting task 0.0 in stage 229.0 (TID 250) (host-10-19-29-165, executor 2, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
24/03/20 09:51:24 INFO BlockManagerInfo: Added broadcast_196_piece0 in memory on host-10-19-29-165:40389 (size: 53.9 KiB, free: 365.9 MiB)
24/03/20 09:51:24 INFO TaskSetManager: Finished task 0.0 in stage 229.0 (TID 250) in 46 ms on host-10-19-29-165 (executor 2) (1/1)
24/03/20 09:51:24 INFO YarnScheduler: Removed TaskSet 229.0, whose tasks have all completed, from pool
24/03/20 09:51:24 INFO DAGScheduler: ResultStage 229 (start at <pastie>:64) finished in 0.085 s
24/03/20 09:51:24 INFO DAGScheduler: Job 141 is finished. Cancelling potential speculative or zombie tasks for this job
24/03/20 09:51:24 INFO YarnScheduler: Killing all running tasks in stage 229: Stage finished
24/03/20 09:51:24 INFO DAGScheduler: Job 141 finished: start at <pastie>:64, took 0.088189 s
24/03/20 09:51:24 INFO HoodieActiveTimeline: Marking instant complete [==>20240320095114761__deltacommit__INFLIGHT]
24/03/20 09:51:24 INFO HoodieActiveTimeline: Checking for file exists ?/tmp/hoodie/hudi40/.hoodie/metadata/.hoodie/20240320095114761.deltacommit.inflight
24/03/20 09:51:24 ERROR AsyncCompactService: Compactor executor failed
org.apache.hudi.exception.HoodieException: Failed to update metadata
    at org.apache.hudi.client.BaseHoodieTableServiceClient.writeTableMetadata(BaseHoodieTableServiceClient.java:706)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.completeCompaction(BaseHoodieTableServiceClient.java:330)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.commitCompaction(BaseHoodieTableServiceClient.java:315)
    at org.apache.hudi.client.BaseHoodieWriteClient.commitCompaction(BaseHoodieWriteClient.java:1077)
    at org.apache.hudi.client.HoodieSparkCompactor.compact(HoodieSparkCompactor.java:61)
    at org.apache.hudi.async.AsyncCompactService.lambda$null$0(AsyncCompactService.java:84)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException
    at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:618)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:599)
    at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:224)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.commit(BaseSparkCommitActionExecutor.java:311)
    at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:201)
    at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:183)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:156)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:63)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1099)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:117)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:810)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:865)
    at org.apache.hudi.client.BaseHoodieTableServiceClient.writeTableMetadata(BaseHoodieTableServiceClient.java:701)
    ... 9 more
24/03/20 09:51:24 INFO HoodieStreamingSink: Async Compactor shutdown. Errored ? true
.```
danny0405 commented 7 months ago

It looks like the MDT timeline instant action is in invalid state.

ad1happy2go commented 7 months ago

@xicm Are you using multi writer setup?

xicm commented 7 months ago

@xicm Are you using multi writer setup?

Single writer

xicm commented 7 months ago

image image image

The instant in the screenshot comes from another job, its name is not the same as the one in the stack trace.

ad1happy2go commented 7 months ago

@xicm Looks like other similar issues also https://github.com/apache/hudi/issues/10906

I recommend you to use HudiStreamer instead of Spark Structured Streaming.

ad1happy2go commented 7 months ago

@xicm I will try to reproduce it. Can you provide more details on the steps which I can follow.

Qiuzhuang commented 6 months ago

@xicm Are you using multi writer setup?

Single writer

I use multiwriters for ingesting writer and offline clustering. Even they don't involve with overlapping file groups. MDT sometimes result in invalid state. For more information, in this case we don't configure multiple writers setup like ZK, is this the problem even two writers handle non overlapping file groups?

danny0405 commented 6 months ago

in this case we don't configure multiple writers setup like ZK

Should always configure the log provider if there are multi writers. But you are right, it is possible we make the metadata table non-blocking since 1.0 release.

nsivabalan commented 6 months ago

but woudn't the inprocess lock provider kick in? and should avoid multiple writers to MDT. I am assuming the setup is, spark streaming w/ async compaction or clustering. A single process, but multiple thread trying to ingest to MDT. if in process lock provider is not kicking in, then its a bug.

xicm commented 6 months ago

The root cause is the deltacommit in MDT rollbacks the compaction instant(compaction in MDT is a deltacommit) in MDT.

When a compaction commits, it will create a inflight DeltaCommit in MDT, because the compaction is asynchronous, Just at this moment, if the ingestion writer begin to commit, the writer will start a new delta commit MDT. In MDT, the new deltacommit will rollback the uncompleted deltacommit(it is created by the async compaction).

Is it possible to filter the deltacommit created by compaction in MDT when we do rollback?

Qiuzhuang commented 6 months ago

but woudn't the inprocess lock provider kick in? and should avoid multiple writers to MDT. I am assuming the setup is, spark streaming w/ async compaction or clustering. A single process, but multiple thread trying to ingest to MDT. if in process lock provider is not kicking in, then its a bug.

If async clustering is in the same process, we don't run into issue for now. But for multiple writes like offline clustering in another process, as indicated by @danny0405, we should have ZK lock provider to serialize MDT write.

Qiuzhuang commented 6 months ago

Hi @danny0405 & @nsivabalan even I configure HMS lock provider for multiple writes, we still run into issues as when we query table, MDT corrupted.

d1f683ef-8927-4df8-9e12-b769b5980b46-0_358-1002-89404_20240410185726547.parquet: No such file or directory!

ad1happy2go commented 6 months ago

@Qiuzhuang Can you provide the lock configurations you set? Did you set hoodie.cleaner.policy.failed.writes=LAZY

Qiuzhuang commented 6 months ago

@Qiuzhuang Can you provide the lock configurations you set? Did you set hoodie.cleaner.policy.failed.writes=LAZY

Sure, here is the configuration for offline clustering:

hoodie.clustering.async.enabled=true hoodie.clustering.async.max.commits=4 hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824 hoodie.clustering.plan.strategy.small.file.limit=419430400 hoodie.clustering.plan.strategy.max.num.groups=400 hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.sort.columns=xx1,xx2 hoodie.layout.optimize.strategy=z-order hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.write.lock.provider=org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider hoodie.write.lock.hivemetastore.database=xxx hoodie.write.lock.hivemetastore.table=locker_xxx hoodie.cleaner.policy.failed.writes=LAZY hoodie.write.concurrency.early.conflict.detection.enable=true

We are also looking into this issue with our cloud vendor as well, FYI.

xicm commented 5 months ago

https://hudi.apache.org/docs/metadata#deployment-model-b-single-writer-with-async-table-services

If we enable async table service with MDT we should config a lock.

xicm commented 5 months ago

Maybe we should set default value of hoodie.datasource.compaction.async.enable to false or make the metadata table non-blocking . It's confusing to user that single writer needs a lock by default. @danny0405