apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo #6403

Closed Armelabdelkbir closed 1 year ago

Armelabdelkbir commented 2 years ago

Hello community,

i'm using Hudi to change data capture with spark structured streaming + kafka + debezium , my jobs works well, but since few days i got some errors on some of my streams : java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo@2424b11}

i got this issue randomly when my streams turn for a long time ( few days / weeks ) it's difficult to reproduce a hudi table corresponds to a kafka topic, to bypass the problem I delete the hdfs path and the checkpoints for the concerned table and I restart my job

Environment Description

Additional context

i don't have multiple writers at a time, just one spark structured streaming job by stream (database)

Stacktrace


    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo@2424b11}
    at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
    at java.util.HashMap.merge(HashMap.java:1253)
    at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
    at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at org.apache.hudi.client.AbstractHoodieWriteClient.getPendingRollbackInfos(AbstractHoodieWriteClient.java:910)
    at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:927)
    at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:917)
    at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:810)
    at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143)
    at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:809)
    at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:802)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at com.ovh.ingest.Ingest$.test$1(Ingest.scala:87)
    at com.ovh.ingest.Ingest$.$anonfun$new$5(Ingest.scala:64)
    at com.ovh.ingest.Ingest$.$anonfun$new$5$adapted(Ingest.scala:64)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    ... 1 more```
nsivabalan commented 2 years ago

We have made some fixed around this. https://github.com/apache/hudi/pull/4123 https://github.com/apache/hudi/pull/4971 If you upgrade to 0.11, you should not see above issue.

Armelabdelkbir commented 2 years ago

thanks for replying i tried to upgrade to 0.11, and i got this error


    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:356)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.adapter.Spark3_1Adapter
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.hudi.SparkAdapterSupport.sparkAdapter(SparkAdapterSupport.scala:37)
    at org.apache.hudi.SparkAdapterSupport.sparkAdapter$(SparkAdapterSupport.scala:29)
    at org.apache.hudi.HoodieSparkUtils$.sparkAdapter$lzycompute(HoodieSparkUtils.scala:46)
    at org.apache.hudi.HoodieSparkUtils$.sparkAdapter(HoodieSparkUtils.scala:46)
    at org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:150)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:241)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at com.ovh.ingest.Ingest$.test$1(Ingest.scala:87)
    at com.ovh.ingest.Ingest$.$anonfun$new$5(Ingest.scala:64)
    at com.ovh.ingest.Ingest$.$anonfun$new$5$adapted(Ingest.scala:64)

Any idea ? thanks

Armelabdelkbir commented 2 years ago

i bypassed the issue above by changing my build.sbt file from

libraryDependencies += "org.apache.hudi" %% "hudi-spark3-bundle" % "0.11.0"

to

libraryDependencies += "org.apache.hudi" %% "hudi-spark3.1-bundle" % "0.11.0"

but i got another error related to upsert commit

do i need to clean all path after upgrading ? i think the issue is related to spark more than hudi, i tried to add

--conf 'spark.sql.legacy.avro.datetimeRebaseModeInWrite=CORRECTED' \ and it looks fine i need to wait some few days weeks to confirm ;)

Stack error

 at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:356)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220816130309001
        at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
        at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46)
        at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:89)
        at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:76)
        at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
        at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213)
        at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:304)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
        ... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 307.0 failed 4 times, most recent failure: Lost task 0.3 in stage 307.0 (TID 1563) (ovh-cnode18.26f5de01-5e40-4d8a-98bd-a4353b7bf5e3.datalake.ovh executor 5): org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition /dev/raw/foundry_eu.db/stockV2_assemblingProjectGroupLink from metadata
        at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:137)
        at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
        at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:304)
        at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:295)
        at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:502)
        at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:103)
        at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:144)
          .........
          Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 175.0 failed 4 times, most recent failure: Lost task 3.3 in stage 175.0 (TID 1371) (ovh-cnode18.26f5de01-5e40-4d8a-98bd-a4353b7bf5e3.datalake.ovh executor 10): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Avro files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.avro.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability. Or set spark.sql.legacy.avro.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.
        at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInWrite(DataSourceUtils.scala:165)
        at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer$.$anonfun$createDateRebaseFuncInWrite$1(AvroSerializer.scala:372)
        at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$15(AvroSerializer.scala:160)
        at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$15$adapted(AvroSerializer.scala:160)
        at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:258)
        at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:61)
        at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroSerializer.serialize(HoodieSpark3_1AvroSerializer.scala:28)
        at org.apache.hudi.AvroConversionUtils$.$anonfun$createInternalRowToAvroConverter$1(AvroConversionUtils.scala:83)
nsivabalan commented 2 years ago

I see you have some very old dates.

org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Avro files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.avro.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability. Or set spark.sql.legacy.avro.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.
nsivabalan commented 1 year ago

recently we fixed handling of very old dates. https://github.com/apache/hudi/pull/6352 you may need to set this config

--conf spark.sql.avro.datetimeRebaseModeInWrite=LEGACY

to get it working.

nsivabalan commented 1 year ago

@Armelabdelkbir : let us know if the above resolution fixes your issue. Feel free to close out the issue if don't see the problem anymore.

Armelabdelkbir commented 1 year ago

@Armelabdelkbir : let us know if the above resolution fixes your issue. Feel free to close out the issue if don't see the problem anymore.

the issue is fixed by setting , --conf 'spark.sql.legacy.avro.datetimeRebaseModeInWrite=CORRECTED' many thanks

nsivabalan commented 1 year ago

cool, thanks for the update!