apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.18k stars 2.38k forks source link

[SUPPORT] multi writer insert overwrite different partition conflict resolve stragegy throw StringIndexOutOfBoundsException #11516

Open dongtingting opened 5 days ago

dongtingting commented 5 days ago

Describe the problem you faced

I have 5 writer job insert overwrite different partition(without partition or bucket overlap). Finally, one writer job failed with execption:

24/06/25 12:28:28 Driver ERROR SparkSQLDriver: Failed in [ insert overwrite table defaultdb.test_table partition(p_date, p_product) xxx ] java.lang.StringIndexOutOfBoundsException: begin 0, end 8, length 0 at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319) at java.base/java.lang.String.substring(String.java:1874) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdStrFromFileId(BucketIdentifier.java:86) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:82) at org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy.lambda$hasConflict$1(BucketIndexConcurrentFileWritesConflictResolutionStrategy.java:51) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy.hasConflict(BucketIndexConcurrentFileWritesConflictResolutionStrategy.java:52) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:88) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:85) at org.apache.hudi.client.BaseHoodieClient.resolveWriteConflict(BaseHoodieClient.java:202) at org.apache.hudi.client.BaseHoodieWriteClient.preCommit(BaseHoodieWriteClient.java:346) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:232) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1137) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:439) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:133) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:131) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:71) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:69) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:80) at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:283) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.(Dataset.scala:194) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:652) at org.apache.spark.sql.SparkSession$$anonfun$8.apply(SparkSession.scala:651) at org.apache.spark.KwaiDriverMetricsCollector$.countTime(KwaiDriverMetricsCollector.scala:143) at org.apache.spark.KwaiDriverMetricsCollector$.countSqlExecuteTime(KwaiDriverMetricsCollector.scala:104) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:651) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:117) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.processCmd(SparkSQLCLIDriver2.scala:501) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336) at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.processFile(SparkSQLCLIDriver2.scala:450) at org.apache.spark.sql.hive.SparkSQLCLIDriver2$.main(SparkSQLCLIDriver2.scala:293) at org.apache.spark.sql.hive.SparkSQLCLIDriver2.main(SparkSQLCLIDriver2.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:882)`

Environment Description

I try catch Exception at BucketIndexConcurrentFileWritesConflictResolutionStrategy.java hasConflict fucntion by myself and print log of conflict operation. log content is blow.

24/06/25 12:28:28 Driver ERROR BucketIndexConcurrentFileWritesConflictResolutionStrategy: hasConflict got exception, between first operation = {actionType=**replacecommit**, instantTime=20240625120033401, actionState=**INFLIGHT**'}, second operation = {actionType=**replacecommit**, instantTime=20240625120040733, actionState=**INFLIGHT**'},java.lang.StringIndexOutOfBoundsException: begin 0, end 8, length 0

I found confict operation is two replacecommit inflight instant. also I checked the metadata content of these two inflight replacecommit instant. FileId is empty string ,this caused substring throw StringIndexOutOfBoundsException.

replacement-inflight

Is this an known bug? I think it's best not throw StringIndexOutOfBoundsException. Could we catch the exception and rethrow an easy understant execption?

danny0405 commented 4 days ago

Did you try 0.14.1, it might already be fixed in it.