Closed usberkeley closed 2 weeks ago
Maybe related with these two fixes: https://github.com/apache/hudi/pull/5185 and https://github.com/apache/hudi/pull/8263
thanks. Still tracking the issue
@danny0405 danny,the problem has been located. After HUDI-1517 enhanced the Marker, the change in the Marker file suffix caused the judgment condition to change. The log file of a single delta commit was not deleted in the Log File Marker.
The Master branch has solved this problem through Marker IOType.Create, that is, Marker Based Rollback will delete the log file of a single delta commit.
For version 0.15, we have fixed the problem. Do we still need to contribute code to the remote/0.15 branch to fix this problem?
The last few lines of code are the solution for version 0.15 (starting with “Added by usberkeley”), Friends who need it can take a look
protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant instantToRollback, String fileNameWithPartitionToRollback) {
StoragePath fullLogFilePath = new StoragePath(basePath, fileNameWithPartitionToRollback);
String relativePartitionPath = FSUtils.getRelativePartitionPath(new StoragePath(basePath), fullLogFilePath.getParent());
String fileId;
String baseCommitTime;
Option<HoodieLogFile> latestLogFileOption;
Map<String, Long> logBlocksToBeDeleted = new HashMap<>();
// Old marker files may be generated from base file name before HUDI-1517. keep compatible with them.
if (FSUtils.isBaseFile(fullLogFilePath)) {
LOG.warn("Find old marker type for log file: " + fileNameWithPartitionToRollback);
fileId = FSUtils.getFileIdFromFilePath(fullLogFilePath);
baseCommitTime = FSUtils.getCommitTime(fullLogFilePath.getName());
StoragePath partitionPath = FSUtils.constructAbsolutePath(config.getBasePath(), relativePartitionPath);
// NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its
// block to the latest log-file
try {
latestLogFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getStorage(), partitionPath, fileId,
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
if (latestLogFileOption.isPresent() && baseCommitTime.equals(instantToRollback.getTimestamp())) {
StoragePath fullDeletePath = new StoragePath(partitionPath, latestLogFileOption.get().getFileName());
return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, EMPTY_STRING,
Collections.singletonList(fullDeletePath.toString()),
Collections.emptyMap());
}
if (latestLogFileOption.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOption.get();
// NOTE: Markers don't carry information about the cumulative size of the blocks that have been appended,
// therefore we simply stub this value.
logBlocksToBeDeleted = Collections.singletonMap(latestLogFile.getPathInfo().getPath().toString(), latestLogFile.getPathInfo().getLength());
}
return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted);
} catch (IOException ioException) {
throw new HoodieIOException(
"Failed to get latestLogFile for fileId: " + fileId + " in partition: " + partitionPath,
ioException);
}
} else {
HoodieLogFile logFileToRollback = new HoodieLogFile(fullLogFilePath);
fileId = logFileToRollback.getFileId();
baseCommitTime = logFileToRollback.getBaseCommitTime();
// NOTE: We don't strictly need the exact size, but this size needs to be positive to pass metadata payload validation.
// Therefore, we simply stub this value (1L), instead of doing a fs call to get the exact size.
logBlocksToBeDeleted = Collections.singletonMap(logFileToRollback.getPath().getName(), 1L);
}
// **Added by usberkeley**
// Log file can be deleted if the commit to rollback is also the commit that created the fileGroup
if (baseCommitTime.equals(instantToRollback.getTimestamp())) {
StoragePath partitionPath = FSUtils.constructAbsolutePath(config.getBasePath(), relativePartitionPath);
try {
latestLogFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getStorage(), partitionPath, fileId,
HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
if (latestLogFileOption.isPresent()) {
StoragePath fullFilePathToDelete = new StoragePath(partitionPath, latestLogFileOption.get().getFileName());
return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, EMPTY_STRING,
Collections.singletonList(fullFilePathToDelete.toString()),
Collections.emptyMap());
}
} catch (IOException ioException) {
throw new HoodieIOException("Failed to get latestLogFile for fileId: " + fileId + " in partition: " + partitionPath, ioException);
}
}
return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted);
}
@usberkeley Thanks for the nice findings, would you mind to contribute this fix into 0.x-branch?
@usberkeley Thanks for raising this. I also noticed that the logic in the else branch of FSUtils.isBaseFile(fullLogFilePath)
is not consistent with the if branch when considering the log file is generated by the same instant time as the file group (this is OK with Spark, but a problem on Flink).
@usberkeley Thanks for the nice findings, would you mind to contribute this fix into 0.x-branch?
my pleasure
@danny0405 @usberkeley I've updated #11830 to include the fix to this issue. I take a simpler approach by achieving the same. PTAL.
@danny0405 @usberkeley I've updated #11830 to include the fix to this issue. I take a simpler approach by achieving the same. PTAL.
Wow, great, thanks Ethan,
Thanks Danny and Ethan. Close the issue, resolved.
Describe the problem you faced
When Flink is restarted, a Duplicate fileId exception occurs in the Flink Bucket MOR table
To Reproduce
Steps to reproduce the behavior:
Expected behavior
No such error
Environment Description
Hudi version : 0.15.0
Spark version : none
Hive version : none
Hadoop version : 3.3
Storage : HDFS
Running on Docker? : no
Extra Conext
Stacktrace
Caused by: java.lang.RuntimeException: Duplicate fileId 00000016-4581-4908-b4d7-97d96dcdfd2a from bucket 16 of partition grass_date=2024-08-04 found during the BucketStreamWriteFunction index bootstrap. at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$2(BucketStreamWriteFunction.java:172) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:419) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:165) at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:115) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:87) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:532) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) at java.lang.Thread.run(Thread.java:748)