apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.42k forks source link

[SUPPORT] java.util.NoSuchElementException: FileID <some-uuid> of partition path tenant=xxxxx/date=YYYYMMD #12298

Open hgudladona opened 2 days ago

hgudladona commented 2 days ago

Describe the problem you faced

Intermittent java.util.NoSuchElementException when writing to partitions that are out of order and not covered by the active timeline.

To Reproduce

We have a hudi job reading from kafka and writing to S3 in partitions dynamically derived from certain columns in the records in the format of tenant=xxxxx/date=YYYYMMDD. Under certain situations when the partition the new data is written into is not in the active timeline (Late arriving data), there seems to be a mismatch between the file group decided in the stage "Getting small files from partitions" and "Doing partition and writing data".

Lets say a FG id 'eef3ab7f-dc8a-40ec-856f-99010184d9f1-1' is decided as a small file in stage "Getting small files from partitions" and passed on to the "Doing partition and writing data" stage to INSERT new data and create a new base file for it, this stage fails with the following exception and fails the streamer job with exception below.

However, this operation streamer job succeeds in 2 situations

  1. Upon restart of the streamer job and the same records are retried to be INSERTed.
  2. If embedded timeline server is turned off

Expected behavior

We expect that there is no mismatch between the views of the stages "Getting small files from partitions" and "Doing partition and writing data" in cases when we are writing to a partition thats no not actively tracked in the active timeline.

Environment Description

Additional context

auto.offset.reset: latest bootstrap.servers: kafka-brokers group.id: hudi-ingest-some-group hoodie.archive.async: true hoodie.archive.automatic: true hoodie.auto.adjust.lock.configs: true hoodie.base.path: s3a://some-base-path hoodie.clean.async: true hoodie.cleaner.hours.retained: 36 hoodie.cleaner.parallelism: 600 hoodie.cleaner.policy: KEEP_LATEST_BY_HOURS hoodie.cleaner.policy.failed.writes: LAZY hoodie.clustering.async.enabled: false hoodie.combine.before.insert: false hoodie.copyonwrite.insert.auto.split: false hoodie.datasource.fetch.table.enable: true hoodie.datasource.hive_sync.database: hudi_events_v1 hoodie.datasource.hive_sync.mode: hms hoodie.datasource.hive_sync.partition_extractor_class: org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.partition_fields: tenant,date hoodie.datasource.hive_sync.table: some-table hoodie.datasource.hive_sync.table_properties: projection.date.type=date|projection.date.format=yyyyMMdd|projection.date.range=19700101,99990101|projection.tenant.type=integer|projection.tenant.range=-1,8675309|projection.enabled=true hoodie.datasource.meta_sync.condition.sync: true hoodie.datasource.sync_tool.single_instance: true hoodie.datasource.write.hive_style_partitioning: true hoodie.datasource.write.keygenerator.class: com.some-class-prefix.KeyGenerator hoodie.datasource.write.operation: insert hoodie.datasource.write.partitionpath.field: tenant:SIMPLE,date:SIMPLE hoodie.datasource.write.precombine.field: event_time_usec hoodie.datasource.write.reconcile.schema: false hoodie.datasource.write.recordkey.field: resource_id hoodie.deltastreamer.kafka.source.maxEvents: 75000000 hoodie.deltastreamer.schemaprovider.registry.url: http://schema-registry.some-suffix:8085 hoodie.deltastreamer.source.kafka.enable.commit.offset: true hoodie.deltastreamer.source.kafka.topic: some-topic hoodie.deltastreamer.source.schema.subject: some-topic-value hoodie.fail.on.timeline.archiving: false hoodie.filesystem.view.incr.timeline.sync.enable: true hoodie.filesystem.view.remote.timeout.secs: 2 hoodie.insert.shuffle.parallelism: 1600 hoodie.memory.merge.max.size: 2147483648 hoodie.metadata.enable: false hoodie.metrics.on: true hoodie.metrics.reporter.metricsname.prefix: hoodie.metrics.reporter.prefix.tablename: false hoodie.metrics.reporter.type: DATADOG hoodie.parquet.compression.codec: zstd hoodie.streamer.source.kafka.minPartitions: 450 hoodie.table.name: <> hoodie.table.partition.fields: tenant,date hoodie.table.type: MERGE_ON_READ hoodie.write.concurrency.mode: OPTIMISTIC_CONCURRENCY_CONTROL hoodie.write.lock.dynamodb.billing_mode: PROVISIONED hoodie.write.lock.dynamodb.endpoint_url: https://dynamodb.us-east-2.amazonaws.com/ hoodie.write.lock.dynamodb.partition_key: some-key hoodie.write.lock.dynamodb.region: us-east-2 hoodie.write.lock.dynamodb.table: HudiLocker hoodie.write.lock.provider: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider hoodie.write.markers.type: DIRECT

Additional logs

2024-11-19T20:05:51,784 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor:Merging updates for commit 20241119200519832 for file eef3ab7f-dc8a-40ec-856f-99010184d9f1-1
2024-11-19T20:05:51,784 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor:Small file corrections for updates for commit 20241119200519832 for file eef3ab7f-dc8a-40ec-856f-99010184d9f1-1
2024-11-19T20:05:51,814 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView:Sending request : (http://<some-prefix>-driver-svc.hudi-ingest.svc:43915/v1/hoodie/view/datafile/latest/partition?partition=tenant%3DXXXXX%2Fdate%3D20241118&basepath=s3a%3A%2F%2Fsome-bucket%2Fhudi<some-prefix>&fileid=eef3ab7f-dc8a-40ec-856f-99010184d9f1-1&lastinstantts=20241119200150363&timelinehash=49ea23be5a76a1a27c370601443200ed6dd13f7c4a9937d84028c3cbf9e72aed)
2024-11-19T20:05:51,853 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.io.HoodieMergeHandleFactory:Create update handle for fileId eef3ab7f-dc8a-40ec-856f-99010184d9f1-1 and partition path tenant=11467/date=20241118 at commit 20241119200519832
2024-11-19T20:05:51,853 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView:Sending request : (http://<some-prefix>-driver-svc.hudi-ingest.svc:43915/v1/hoodie/view/datafile/latest/partition?partition=tenant%3DXXXXX%2Fdate%3D20241118&basepath=s3a%3A%2F%2Fsome-bucket%2Fhudi<some-prefix>&fileid=eef3ab7f-dc8a-40ec-856f-99010184d9f1-1&lastinstantts=20241119200150363&timelinehash=49ea23be5a76a1a27c370601443200ed6dd13f7c4a9937d84028c3cbf9e72aed)

Stacktrace


org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :227
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.NoSuchElementException: FileID eef3ab7f-dc8a-40ec-856f-99010184d9f1-1
 of partition path tenant=XXXXX/date=20241118 does not exist.
    at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:161)
    at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:126)
    at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:68)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368)
    at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)```
ad1happy2go commented 1 day ago

@hgudladona This should already be fixed by https://github.com/apache/hudi/pull/9879 . Can you try with this patch ?

hgudladona commented 1 day ago

This patch requires us to migrate to 1.x.beta release which we are not ready to do yet, Any chance this can be back ported to 0.14.x? Also, can you kindly explain how this can remediate our situation. Are the file groups outside of the active timeline treated as uncommitted with this patch?

hgudladona commented 17 hours ago

@ad1happy2go This patch will still not solve this problem. If you follow the code path getLatestFileSlicesBeforeOrOn will filters the file slices using function getLatestFileSliceFilteringUncommittedFiles which filters using filterUncommittedFiles . Looking into this function

 private Stream<FileSlice> filterUncommittedFiles(FileSlice fileSlice, boolean includeEmptyFileSlice) {
    Option<HoodieBaseFile> committedBaseFile = fileSlice.getBaseFile().isPresent() && completionTimeQueryView.isCompleted(fileSlice.getBaseInstantTime()) ? fileSlice.getBaseFile() : Option.empty();
    List<HoodieLogFile> committedLogFiles = fileSlice.getLogFiles().filter(logFile -> completionTimeQueryView.isCompleted(logFile.getDeltaCommitTime())).collect(Collectors.toList());
    if ((fileSlice.getBaseFile().isPresent() && !committedBaseFile.isPresent())
        || committedLogFiles.size() != fileSlice.getLogFiles().count()) {
      LOG.debug("File Slice (" + fileSlice + ") has uncommitted files.");
      // A file is filtered out of the file-slice if the corresponding
      // instant has not completed yet.
      FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
      committedBaseFile.ifPresent(transformed::setBaseFile);
      committedLogFiles.forEach(transformed::addLogFile);
      if (transformed.isEmpty() && !includeEmptyFileSlice) {
        return Stream.of();
      }
      return Stream.of(transformed);
    }
    return Stream.of(fileSlice);
  }

...

  public boolean isCompleted(String instantTime) {
    return this.startToCompletionInstantTimeMap.containsKey(instantTime)
        || HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN, this.firstNonSavepointCommit);
  }

If a file slice base instant time is less than firstNonSavepointCommit, although the not in active timeline its treated as completed which is pretty similar to the current behavior. Kindly, go through the scenario I mentioned one more time and suggest of this is the right patch?

bhasudha commented 14 hours ago

@nsivabalan could you please help with this?