apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] Using RLI index is throwing Failed to upsert for commit time and Error occurs when executing map #11560

Closed gentrit1 closed 2 months ago

gentrit1 commented 3 months ago

Dataproc environment:

Image version: 2.2.22-debian12 Python version: 3.11.8 Spark version: 3.5.0 Hudi Spark Bundle: hudi-spark3.5-bundle_2.12-0.15.0.jar

Problem:

Writing data using Apache Hudi format and while RLI index is enabled is failing.

When writing a batch for the first time its working (but we are getting the WARN message SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records)

If we try to append some more data to the same existing table its failing with the errors: Failed to upsert for commit time 20240703115211530 and Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map very similar to https://github.com/apache/hudi/issues/10609

If we disable RLI Index the append of data works fine!

Hudi table options that we are using:

hudi_options = { "hoodie.table.name": table_name, "hoodie.datasource.write.table.type": "MERGE_ON_READ", "hoodie.datasource.write.recordkey.field": "key", "hoodie.datasource.write.partitionpath.field": "partitionpath.field", "hoodie.datasource.write.precombine.field": "precombine.field", "hoodie.datasource.write.table.name": table_name, "hoodie.datasource.write.operation": "upsert", "hoodie.combine.before.insert": "true", "hoodie.cleaner.commits.retained": "3", "hoodie.compact.inline.max.delta.commits": "2", "hoodie.enable.data.skipping": "true", "hoodie.metadata.enable": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.metadata.record.index.enable": "true", "hoodie.index.type": "RECORD_INDEX", "hoodie.clustering.inline": "true", "hoodie.clustering.inline.max.commits": "1", "hoodie.clustering.plan.strategy.class": "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy", "hoodie.clustering.plan.strategy.target.file.max.bytes": "40000000", "hoodie.clustering.plan.strategy.sort.columns": "sort.columns", }

Some more logs: `Py4JJavaError: An error occurred while calling o208.save. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240703115211530 at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64) at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:45) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:98) at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:88) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225) at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:473) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:473) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:449) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 199.0 failed 4 times, most recent failure: Lost task 0.3 in stage 199.0 (TID 763) (): org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Error occurs when executing map at jdk.internal.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600) at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678) at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:82) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:280) at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:303) at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170) at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952) at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926) at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327) at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.avro.specific.SpecificRecordBase (org.apache.avro.generic.GenericData$Record and org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader 'app') at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209) at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieRollbackMetadata(TimelineMetadataUtils.java:177) at org.apache.hudi.metadata.HoodieTableMetadataUtil.getRollbackedCommits(HoodieTableMetadataUtil.java:1412) at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$getValidInstantTimestamps$36(HoodieTableMetadataUtil.java:1342) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1342) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:484) at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:439) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:424) at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:301) at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:285) at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38) ... 13 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2415) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2436) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2455) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2480) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:407) at org.apache.spark.rdd.RDD.collect(RDD.scala:1045) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:407) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314) at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:197) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:168) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:85) at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:58) ... 51 more Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Error occurs when executing map at jdk.internal.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600) at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678) at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:82) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:280) at org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:303) at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170) at org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952) at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926) at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327) at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class org.apache.avro.specific.SpecificRecordBase (org.apache.avro.generic.GenericData$Record and org.apache.avro.specific.SpecificRecordBase are in unnamed module of loader 'app') at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:209) at org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeHoodieRollbackMetadata(TimelineMetadataUtils.java:177) at org.apache.hudi.metadata.HoodieTableMetadataUtil.getRollbackedCommits(HoodieTableMetadataUtil.java:1412) at org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$getValidInstantTimestamps$36(HoodieTableMetadataUtil.java:1342) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1342) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:484) at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:439) at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:424) at org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:301) at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:285) at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38) ... 13 more`

soumilshah1995 commented 3 months ago

Hey there can you please provide sample dataset using faker so we can try locally to re produce this behavior ?

gentrit1 commented 3 months ago

Hey there can you please provide sample dataset using faker so we can try locally to re produce this behavior ?

Is it okay for you to use the fake data from the parquet file while having the table options specified as below:

hudi_options = { "hoodie.table.name": table_name, "hoodie.datasource.write.table.type": "MERGE_ON_READ", "hoodie.datasource.write.recordkey.field": "UniqueNumber", # key "hoodie.datasource.write.partitionpath.field": "Date,Job", "hoodie.datasource.write.precombine.field": "Timestamp", "hoodie.datasource.write.table.name": table_name, "hoodie.datasource.write.operation": "upsert", "hoodie.combine.before.insert": "true", "hoodie.cleaner.commits.retained": "3", "hoodie.compact.inline.max.delta.commits": "2", "hoodie.enable.data.skipping": "true", "hoodie.metadata.enable": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.metadata.record.index.enable": "true", "hoodie.index.type": "RECORD_INDEX", "hoodie.clustering.inline": "true", "hoodie.clustering.inline.max.commits": "1", "hoodie.clustering.plan.strategy.class": "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy", "hoodie.clustering.plan.strategy.target.file.max.bytes": "40000000", "hoodie.clustering.plan.strategy.sort.columns": "UniqueNumber", }

also find here the python code I used the generate the parquet:

`from faker import Faker import pandas as pd import uuid

fake = Faker() num_records = 1000

data = { "UniqueNumber": [str(uuid.uuid4()) for _ in range(num_records)], "Name": [], "Address": [], "Email": [], "Phone Number": [], "Job": [], "Company": [], "Date": [], "Timestamp": [], }

for _ in range(num_records): data["Name"].append(fake.name()) data["Address"].append(fake.address()) data["Email"].append(fake.email()) data["Phone Number"].append(fake.phone_number()) data["Job"].append(fake.job()) data["Company"].append(fake.company()) data["Date"].append(fake.date('2024-07-04')) data["Timestamp"].append(fake.unix_time())

df = pd.DataFrame(data)

df.to_parquet("fake_data.parquet", engine="fastparquet", index=False)

print("Dataset generated and saved to fake_data.parquet")

`

ad1happy2go commented 3 months ago

@gentrit1 This is known issue with RLI.

Check below comment - https://github.com/apache/hudi/issues/10609#issuecomment-2167548029

gentrit1 commented 3 months ago

@gentrit1 This is known issue with RLI.

Check below comment - #10609 (comment)

Hi @ad1happy2go ,

I saw that but thats not working in our case. The current spark configs we are using are:

spark = ( SparkSession.builder.appName("Hudi Basics") .config("spark.jars", "gs://path-to-jars/jars/hudi-spark3.5-bundle_2.12-0.15.0.jar") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog", ) .config("spark.driver.extraClassPath","gs://path-to-jars/jars/hudi-spark3.5-bundle_2.12-0.15.0.jar") .config("spark.executor.extraClassPath","gs://path-to-jars/jars/hudi-spark3.5-bundle_2.12-0.15.0.jar") .config( "spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", ) .config( "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" ) .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") .config("spark.sql.hive.convertMetastoreParquet", "false") .config("spark.sql.legacy.timeParserPolicy", "LEGACY") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("spark.sql.legacy.timeParserPolicy", "LEGACY") .getOrCreate() ) spark.conf.set('spark.sql.session.timeZone', 'UTC')

ad1happy2go commented 3 months ago

@gentrit1 Are you using dataproc serverless? If yes, setting here it doesn't broadcast the jars properly. It is the known issue from the GCP side. But you can build your own dataproc image adding these jars and get it working.

gentrit1 commented 2 months ago

@gentrit1 Are you using dataproc serverless? If yes, setting here it doesn't broadcast the jars properly. It is the known issue from the GCP side. But you can build your own dataproc image adding these jars and get it working.

We are not using dataproc serverless but since you are sure that adding the configs (spark.driver.extraClassPath and spark.executor.extraClassPath) fix this problem then we can close the ticket!

Thanks!

ad1happy2go commented 2 months ago

@gentrit1 Please reopen or create a new one in case you still faces this one again. Thanks.