apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[SUPPORT]Hudi insert job failded due to multiple files belongs to the same bucket id #11527

Open dongtingting opened 1 week ago

dongtingting commented 1 week ago

Describe the problem you faced

I have one job insert into a new partition, job attemp1 failed due to shuffle fetch failed (internal environment problem). I rerun this job (job attemp2), but it failed thows exception:

com.kuaishou.dataarch.sql.core.exception.DataQueryException: org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 51001 from org.apache.hadoop.hive.ql.smarter.proxy.SparkProxyClientTask
org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240620181905373
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:74)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:45)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:219)
    at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:431)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:133)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:131)
    at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:71)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:69)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:80)
    at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:283)
    at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
    at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
    at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:652)
    at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:651)
    at org.apache.spark.KwaiDriverMetricsCollector$.countTime(KwaiDriverMetricsCollector.scala:143)
    at org.apache.spark.KwaiDriverMetricsCollector$.countSqlExecuteTime(KwaiDriverMetricsCollector.scala:104)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 2.0 failed 5 times, most recent failure: Lost task 5.4 in stage 2.0 (TID 4656, xxx, executor 1051): java.io.IOException: java.lang.RuntimeException: org.apache.hudi.exception.HoodieIOException: Find multiple files at partition path=dt=20240613 belongs to the same bucket id = 735
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:124)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:959)
    at java.base/java.lang.Thread.run(Thread.java:959)
Caused by: java.lang.RuntimeException: org.apache.hudi.exception.HoodieIOException: Find multiple files at partition path=dt=20240613 belongs to the same bucket id = 735
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.shuffle.stream.StreamShuffleWriter.write(StreamShuffleWriter.java:141)
    ... 10 more
Caused by: org.apache.hudi.exception.HoodieIOException: Find multiple files at partition path=dt=20240613 belongs to the same bucket id = 735
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadBucketIdToFileIdMappingForPartition$0(HoodieSimpleBucketIndex.java:65)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition(HoodieSimpleBucketIndex.java:56)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:96)
    at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:89)
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
    ... 14 more 

Job env and parameters:

  • hoodie.cleaner.policy.failed.writes' = 'LAZY'
  • hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL;
  • hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
  • hoodie.write.lock.client.num_retries=1500;
  • hoodie.write.lock.filesystem.expire=10;

I checked job attemp1 have multiple file with different fileIdPrefix on bucket 735 and other buckets, like the this:

image

I analysis this problem and try to answer following questions:

  1. why job attemp1 generate multiple fileid on same bucket? there are task retry and spark speculative task,this will case multiple task try write one bucket file。 because of hudi generate bucket fileid in task(SparkBucketIndexPartitioner.getBucketInfo) not driver,multi writer task of same bucket will generate different uuid 。then it will have multiple fileid on same bucket。

  2. why job attemp2 Find multiple files and throw execption? when job attemp2 go into tagLocation, it will run HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition, finally go into AbstractTableFileSystemView. getLatestFileSlicesBeforeOrOn. getLatestFileSlicesBeforeOrOnonly function will scan the files on the partition and build fileslices, it do not filter the failed commit files. so in this case, job attemp2 find remaining files of failed job attemp1,and job attemp1 has multi fileid on same bucket,so throw exeption。

(HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition -->HoodieIndexUtils.getLatestFileSlicesForPartition -> AbstractTableFileSystemView. getLatestFileSlicesBeforeOrOn)

I think maybe we can optimize these two points:

  1. we can generate bucket fileid in driver。writer task get fileid from driver, it can avoid multi task generate different fileid。
  2. HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition can only get completed commit fileslices。 job attemp1 files will be rollback later。

is there anyone can help to check whether the above optimization is reasonable?

danny0405 commented 1 week ago

@beyond1920 any insights here?

beyond1920 commented 1 week ago

@danny0405 @dongtingting Good point. I think your analysis is reasonable. Generate fileid in driver could avoid different fg id for the same bucket id, but it might cost too much memory for some cases.