apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.43k forks source link

[BUG] S3 Deltastreamer: Block has already been inflated #6428

Open dyang108 opened 2 years ago

dyang108 commented 2 years ago

Describe the problem you faced

Deltastreamer with write output to S3 exits unexpectedly when running in continuous mode.

To Reproduce

Steps to reproduce the behavior: I ran the following:

/etc/spark/bin/spark-submit --conf -Dconfig.file=/service.conf,spark.executor.extraJavaOptions=-Dlog4j.debug=true --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --jars /etc/spark/work-dir/* /etc/spark/work-dir/hudi-utilities-bundle_2.11-0.12.0.jar --props /mnt/mesos/sandbox/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --target-base-path "s3a://strava.scratch/tmp/derick/hudi" --target-table "aligned_activities" --op "UPSERT" --source-ordering-field "ts" --table-type "COPY_ON_WRITE" --source-limit 100 --continuous

the /etc/spark/work-dir/ looks like this: aws-java-sdk-bundle-1.12.283.jar hadoop-aws-2.6.5.jar hudi-utilities-bundle_2.11-0.12.0.jar scala-library-2.11.12.jar spark-streaming-kafka-0-10_2.11-2.4.8.jar

Expected behavior

I don't expect there to be issues on compaction here.

Environment Description

I'm reading from an Avro kafka topic

Additional context

Add any other context about the problem here.

Reading Avro record from Kafka

hoodie.datasource.write.recordkey.field=activityId
auto.offset.reset=latest

Stacktrace

22/08/17 23:07:26 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220817230714888
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:190)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:187)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220817230714888
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
    at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:155)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:588)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:335)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:687)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://strava.scratch/tmp/derick/hudi/__HIVE_DEFAULT_PARTITION__ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:137)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:305)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:296)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:517)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:103)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:144)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:69)
    at org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:89)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:63)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:230)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:508)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:470)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:416)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getOrCreateReaders$11(HoodieBackedTableMetadata.java:402)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:402)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$1(HoodieBackedTableMetadata.java:211)
    at java.util.HashMap.forEach(HashMap.java:1290)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:209)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:141)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:312)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:135)
    ... 37 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:267)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:278)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:83)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    ... 55 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:137)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions(HoodieIndexUtils.java:87)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:144)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
    at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:49)
    at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:32)
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
    ... 11 more
Caused by: org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://strava.scratch/tmp/derick/hudi/__HIVE_DEFAULT_PARTITION__ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:137)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:305)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:296)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:517)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:103)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:144)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:69)
    at org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:89)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    ... 3 more
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:63)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:230)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:508)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:470)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:416)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getOrCreateReaders$11(HoodieBackedTableMetadata.java:402)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:402)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$1(HoodieBackedTableMetadata.java:211)
    at java.util.HashMap.forEach(HashMap.java:1290)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:209)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:141)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:312)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:135)
    ... 37 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:267)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:278)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:83)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    ... 55 more
22/08/17 23:07:26 INFO DeltaSync: Shutting down embedded timeline server
22/08/17 23:07:26 INFO EmbeddedTimelineService: Closing Timeline server
22/08/17 23:07:26 INFO TimelineService: Closing Timeline Service
22/08/17 23:07:26 INFO Javalin: Stopping Javalin ...
22/08/17 23:07:26 INFO SparkUI: Stopped Spark web UI at http://d16171598c10:8090
22/08/17 23:07:26 ERROR Javalin: Javalin failed to stop gracefully
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractConnector.doStop(AbstractConnector.java:333)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
    at org.apache.hudi.org.eclipse.jetty.server.ServerConnector.doStop(ServerConnector.java:248)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at org.apache.hudi.org.eclipse.jetty.server.Server.doStop(Server.java:450)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at io.javalin.Javalin.stop(Javalin.java:195)
    at org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:325)
    at org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:141)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.close(DeltaSync.java:905)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.close(HoodieDeltaStreamer.java:831)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.onDeltaSyncShutdown(HoodieDeltaStreamer.java:223)
    at org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
22/08/17 23:07:26 INFO Javalin: Javalin has stopped
22/08/17 23:07:26 INFO TimelineService: Closed Timeline Service
22/08/17 23:07:26 INFO EmbeddedTimelineService: Closed Timeline server
22/08/17 23:07:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/08/17 23:07:26 INFO MemoryStore: MemoryStore cleared
22/08/17 23:07:26 INFO BlockManager: BlockManager stopped
22/08/17 23:07:26 INFO BlockManagerMaster: BlockManagerMaster stopped
22/08/17 23:07:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/08/17 23:07:26 INFO SparkContext: Successfully stopped SparkContext
rmahindra123 commented 2 years ago

@yihua can you take a look, seems like a metadata related issue?

dyang108 commented 2 years ago

Update: I got it working on an older version of Hudi 0.10.1, so seems like a regression

nsivabalan commented 2 years ago

@dyang108 : is this happening infrequently? or your pipeline is just stuck. We have unit tests, integration tests for metadata table and we haven't this this issue yet. trying to gauge whats diff in your env or set up. looks like we are just trying to read records from metadata table. nothing fancy.

dyang108 commented 2 years ago

This happened consistently with the command above every time i ran on Hudi version : 0.12.0 (also tried 0.11.1)

The pipeline failed and exited when I saw this issue.

kasured commented 2 years ago

We have the same stacktrace when running on hudi version 0.11.0, spark 3.2.1, EMR 6.7. We have metadata service enabled and our Spark Streaming Query fails each time. This is a COW table

kasured commented 2 years ago

@nsivabalan What might be a general workaround in that situation to unblock the processing? Of course it depends on the root cause. However will deleting and recreating metadata from hudi-cli help ? One other option might be to disable metadata on the current table and proceed.

nsivabalan commented 2 years ago

got it. did you mean, you are using EMR's spark or oss spark? I understand its EMR cluster.

yihua commented 2 years ago

@kasured to unblock the processing, could you try disabling and deleting the metadata table by setting hoodie.metadata.enable=false in Hudi configs? This automatically deletes the metadata table after a few commits.

kasured commented 2 years ago

@yihua Yes that helped. However I can assume that the same can be done with hudi-cli as I wrote before. medatada delete and metadata create

@nsivabalan Yes we are using amazon bundle for Spark 3.2.1 which is provided by EMR 6.7

nsivabalan commented 2 years ago

yes, you are right. you can disable via hudi-cli as well.

nsivabalan commented 2 years ago

Since we could not reproduce w/ OSS spark, can you reach out to aws support. CC @umehrot2 @rahil-c : Have you folks seen this issue before. seems like simple read from metadata table is failing w/ EMR spark.

dyang108 commented 2 years ago

I saw this issue with Spark on mesos (on EC2), not EMR Spark

gunjanchaudhary87 commented 1 year ago

Hi ,

Is there any resolution for this issue yet or any idea by which release this issue can be fixed ? I am also facing the same issue. My test case is very simple - to reload same file twice

  1. With hoodie.datasource.write.operation = BulkInsert
  2. With hoodie.datasource.write.operation = Upsert

When metadata is enabled Bulk Insert works fine , but Upsert Aborts with "Caused by: java.lang.IllegalStateException: Block has already been inflated" When metedata is disabled ( hoodie.metadata.enable = false ) The Upsert works fine.

My test cases mostly depend on Metadata , so I need it to be enabled. Please let me know if there is any other workaround.

Thank you !

danny0405 commented 1 year ago

cc @nsivabalan to look into this issue, thanks.

jschuck9 commented 1 year ago

Is a fix for this issue planned to be regressed into 13.0 or a 12.x patch release?

wyb199701 commented 1 year ago

I got the same issue. When I excluded the hudi-common from hudi-aws, it worked successfully

xccui commented 1 year ago

Hit this when using Flink 1.16 and Hudi bdb50ddccc9631317dfb06a06abc38cbd3714ce8 on EKS. Metadata table was disabled enabled.

2023-04-19 23:55:23
org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://path-to-data/ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:152)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:69)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$16(AbstractTableFileSystemView.java:428)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:419)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestMergedFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:854)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:104)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestMergedFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:195)
    at org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile.smallFilesProfile(DeltaWriteProfile.java:62)
    at org.apache.hudi.sink.partitioner.profile.WriteProfile.getSmallFiles(WriteProfile.java:191)
    at org.apache.hudi.sink.partitioner.BucketAssigner.getSmallFileAssign(BucketAssigner.java:179)
    at org.apache.hudi.sink.partitioner.BucketAssigner.addInsert(BucketAssigner.java:137)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.getNewRecordLocation(BucketAssignFunction.java:215)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.processRecord(BucketAssignFunction.java:200)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.processElement(BucketAssignFunction.java:162)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
    at org.apache.hudi.metadata.HoodieMetadataLogRecordReader$Builder.build(HoodieMetadataLogRecordReader.java:218)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:539)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:440)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:425)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$3(HoodieBackedTableMetadata.java:239)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:237)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:152)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:339)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:150)
    ... 28 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:76)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:276)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:287)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.readRecordsFromBlockPayload(HoodieDataBlock.java:166)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecordIterator(HoodieDataBlock.java:128)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.getRecordsIterator(AbstractHoodieLogRecordReader.java:807)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:630)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366
xccui commented 1 year ago

I don't know the detailed logic here, but apparently recursively invoking inflate() when hitting an IOException will cause the state check to fail.

image

zinking commented 1 year ago

@kasured to unblock the processing, could you try disabling and deleting the metadata table by setting hoodie.metadata.enable=false in Hudi configs? This automatically deletes the metadata table after a few commits.

right, this seems obviously flawed, it is hiding the actual IO Exception, instead throwing an irrevelant block inflated.

danny0405 commented 1 year ago

@zinking Can you fire a fix for it.

envomp commented 1 year ago

Hey folks,

This issue: https://gist.github.com/envomp/268bdd35a3b3399db59583c0e159c229#file-cover-logs Seems to be a cover-up to real underlying issue which in our case was: https://gist.github.com/envomp/268bdd35a3b3399db59583c0e159c229#file-actual-logs

Which in turn was caused by TIMELINE_SERVER_BASED marker types being unable when using spark structured streaming. Workaround was to disable metadata table.

ad1happy2go commented 1 year ago

@envomp Are you setting fs.s3a.connection.maximum to a higher value. That might fix the Connection timeout issue.

envomp commented 1 year ago

Hey @ad1happy2go

We have the following s3a configurations:

spark.hadoop.fs.s3a.path.style.access: true
spark.hadoop.fs.s3a.threads.max: 64
spark.hadoop.fs.s3a.connection.maximum: 1024
spark.hadoop.fs.s3a.maxRetries: 64

Also tried setting fs.s3a.connection.maximum to 8096 but the issue persisted.

EDIT:

For a table with smaller volume this is how disabling metadata table affected the app duraton time: Screenshot 2023-10-09 at 14 34 42

chestnutqiang commented 9 months ago

same problem, version 0.14.1, on hdfs .

yihua commented 1 month ago

The issue of Block has already been inflated is due to a bug that is fixed by #7434.