Open CaesarWangX opened 6 months ago
Did you try the Hudi 0.14.1 release? Did you enable the metadata table?
Hi @danny0405 , we don't need the metadata table, so as i mentioned, we set metadata.enable=false. We are using hudi in AWS EMR, so we don't have chance to use hudi0.14.1. we also checked the code in 0.14.1, It looks the same as 0.14
Hi @danny0405 @xushiyan , We are using spark3.4.1 and hudi0.14.0. Updated the context and please help look into this. Thank you
The reason we do not use metadata table is that in spark structured streaming, enabling the metadata table will affect the efficiency of micro batch, as there will be additional list operations.
Caused by: java.util.NoSuchElementException: FileID xxxxx of partition path dt=2019-02-20 does not exist.
at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:159)
at org.apache.hudi.io.HoodieMergeHandle.(HoodieMergeHandle.java:121)
at org.apache.hudi.io.FlinkMergeHandle.(FlinkMergeHandle.java:70)
at org.apache.hudi.io.FlinkConcatHandle.(FlinkConcatHandle.java:53)
at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:557)
at org.apache.hudi.client.HoodieFlinkWriteClient.insert(HoodieFlinkWriteClient.java:175)
at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$0(StreamWriteFunction.java:181)
at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:461)
The error msg indicates that you enabled the inline clustering for Flink, can you disable that and try again by using the async clustering instead.
Hi @danny0405, I didn't understand your point. This is a job for writing data using Spark Structured Streaming + Hudi, and it's an MOR INSERT operation. It seems unrelated to Flink and async clustering, and async clustering is false by default.
@CaesarWangX That's interesting. All confusion is as in your stack trace it is pointing to "FlinkMergeHandle" code which ideally spark streaming should not use.
@ad1happy2go @danny0405 I'm sorry, I have re uploaded a new trace log 😅
@CaesarWangX Can you also post writer configuration please
@ad1happy2go Sure, here it is.
"hoodie.datasource.write.table.type" = "MERGE_ON_READ"
"hoodie.table.name" = "smart_event"
"hoodie.datasource.write.recordkey.field" = "rowkey"
"hoodie.datasource.write.operation" = "insert"
"hoodie.datasource.write.hive_style_partitioning" = "true"
"hoodie.datasource.hive_sync.partition_fields" = "dt"
"hoodie.datasource.hive_sync.partition_extractor_class" = "org.apache.hudi.hive.MultiPartKeysValueExtractor"
"hoodie.datasource.write.precombine.field" = "log_time"
"hoodie.upsert.shuffle.parallelism" = "200"
"hoodie.insert.shuffle.parallelism" = "200"
"hoodie.parquet.small.file.limit" = "441600"
"hoodie.parquet.max.file.size" = "829120"
"hoodie.index.type" = "BLOOM"
"hoodie.bloom.index.prune.by.ranges" = "true"
"hoodie.bloom.index.use.caching" = "true"
"hoodie.bloom.index.use.treebased.filter" = "true"
"hoodie.bloom.index.bucketized.checking" = "true"
"hoodie.bloom.index.filter.type" = "DYNAMIC_V0"
"hoodie.cleaner.policy" = "KEEP_LATEST_COMMITS"
"hoodie.clean.automatic" = "true"
"hoodie.clean.async" = "true"
"hoodie.keep.min.commits" = "5"
"hoodie.keep.max.commits" = "7"
"hoodie.cleaner.parallelism" = "200"
"hoodie.embed.timeline.server" = "false"
"hoodie.datasource.compaction.async.enable" = "true"
"hoodie.cleaner.commits.retained" = "4"
"hoodie.copyonwrite.insert.auto.split" = "true"
"hoodie.merge.allow.duplicate.on.inserts" = "true"
"hoodie.metadata.enable" = "false"
"hoodie.cleaner.policy.failed.writes" = "NEVER"
@ad1happy2go you can follow the following step to reproduce it
Steps to reproduce the behavior:
Filegroup has only one data file. Delete the deltacommit corresponding to this filegroup (only delete deltacommit, keep request and inflight). New data will continue to be written to this filegroup (determined by getSmallFiles).
@CaesarWangX Is this the last delta commit in your timeline? Why we are deleting the delta commit file manually? You are saying if you do that, it's not running rollback?
Hi @ad1happy2go, yes, because we set "hoodie.cleaner.policy.failed.writes" = "NEVER", (just like the configs I provided above ). so it's not running rollback. And the reason we manually deleted the delta commit file is to reproduce the issue of field not found issue if you follow the steps to reproduce issue above, then you may found this issue😅
Hi @ad1happy2go, Is there any progress? Thank you
We are also facing this issue in Hudi 0.14.1 on EMR. Any fix for this ?
@nishantpanwar18 The solution is in the last two images 😅
@CaesarWangX Thank you !! I am not sure how to go about it, we are using EMR, which comes with pre built hudi jars. How did you made these changes ?
@CaesarWangX is it possible to create a PR for the same. We can have it reviewed and discuss in detail about the PR.
Created tracking JIRA - https://issues.apache.org/jira/browse/HUDI-8014 cc @nishantpanwar18
Should already be fixed by - https://github.com/apache/hudi/pull/9879
Tips before filing an issue
Have you gone through our FAQs?
Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
Spark3.4.1 + hudi0.14.0 After upgrading from Hudi 0.11 to Hudi 0.14, we occasionally encounter this error. Once it occurs, the task continues to fail even after restarting, and the error message remains the same with the FileID.
I have reviewed the following two issues, but unfortunately, there is no effective solution available. https://github.com/apache/hudi/issues/5886 https://github.com/apache/hudi/issues/8890
Fortunately, by examining the source code, I have successfully replicated this issue and attempted to modify the source code to prevent task failures.
Any information or suggestions are helpful, thank you.
To Reproduce
Steps to reproduce the behavior:
BWT, Under normal circumstances, the task fails due to this reason when these three conditions are coincidentally met at the same time. How challenging it is indeed! Therefore, reproducing this error is quite difficult. Essentially, it occurs when the job fails after completing data writing to the filegroup but before committing the deltacommit.😅
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
*** Hudi version : 0.14.0, MOR, INSERT
Spark version : 3.4.1**
Storage (HDFS/S3/GCS..) :
Running on Docker? (yes/no) :
Additional context
Not finished yet, will provide more information later. In Hudi 0.11, performing the same reproducible operation does not result in such an error, but in 0.14, it does. After examining the code, the key difference between Hudi 0.11 and 0.14 is as follows: AbstractTableFileSystemView
Hudi0.11:
Hudi0.14:
It seems like this: After reproducing the issue using the steps mentioned above, In version 0.11, the method getLatestFileSlicesBeforeOrOn retrieves a Stream called fileSliceStream, using fetchLatestFileSlicesBeforeOrOn.
In fetchLatestFileSlicesBeforeOrOn, after calling fetchAllStoredFileGroups, it then filters again (line 1017), which excludes the error-prone fileID(getAllFileSlices in getLatestFileSliceBeforeOrOn). This prevents meeting the third condition necessary for reproducing the issue.
In version 0.14, the getLatestFileSlicesBeforeOrOn method uses fetchAllStoredFileGroups. In fetchAllStoredFileGroups, it directly retrieves all filegroups, and then filters them using getAllFileSlicesBeforeOn in a map operation. However, this filtering does not exclude filegroups that have lost deltacommit. Therefore, they are still considered writable small files, satisfying the third step of the reproduction steps, leading to the occurrence of the issue.
After comparison, we attempted to introduce a new method. Based on getAllFileSlicesBeforeOn, we defined a new method getAllFileSlicesBeforeOrOn. Before the normal filtering, this method first calls getAllFileSlices, which filters out filegroups with lost deltacommit. Then, in getLatestFileSlicesBeforeOrOn, we modified line 846 from .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime)) to .map(fg -> fg.getAllFileSlicesBeforeOrOn(maxCommitTime)). After testing, this prevents filegroups with lost deltacommit from being treated as small files.
Tested and it can work...Any information or suggestions are helpful, thank you.
Stacktrace
Caused by: java.util.NoSuchElementException: FileID 1b88b26f-94b1-4bd1-94ad-e919e17183ee-0 of partition path datatype=xxxx/year=2024/month=05/day=07 does not exist. at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:161) at org.apache.hudi.io.HoodieMergeHandle.(HoodieMergeHandle.java:126)
at org.apache.hudi.io.HoodieConcatHandle.(HoodieConcatHandle.java:79)
at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:63)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368)
at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)
... 30 more