apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] File is deleted during inline compaction on MOR table causing subsequent FileNotFoundException on a reader #5298

Closed kasured closed 2 years ago

kasured commented 2 years ago

Describe the problem you faced

When inline compaction is turned on and when the actual compaction plan is completed, the commit file is referencing the file which has been deleted during the compaction process. Later, this is causing the reader to fail with FileNotFoudException

To Reproduce

I managed to reproduce the issue on a constant basis. After the first compaction action is completed it causes all subsequent reads to fail, because the commit file is referencing the already deleted parquet file on the system. Please see Additional Context session for more details. The issues can only be reproduced when multiple tables are used within the same SparkSession.

Expected behavior

After inline/async compaction the commit files in .hoodie folder are in sync with the files in the file system. Also there are no files deleted during the compaction.

Main observations so far

Environment Description

Additional context

We are using Spark streaming with Kafka topics as a source. Topic -> foreachBatch -> StreamingQuery per table -> DataFrame write -> Hudi MOR table. For each table we are using the following related configuration options

        "hoodie.datasource.write.table.type" = "MERGE_ON_READ"
        "hoodie.datasource.write.precombine.field" = "value.source.lsn"
        "hoodie.datasource.write.recordkey.field" = "cluster,shard,key"
        "hoodie.datasource.write.partitionpath.field" = "table,cluster,shard"
        "hoodie.datasource.write.keygenerator.class" = "org.apache.hudi.keygen.ComplexKeyGenerator"
        "hoodie.datasource.write.hive_style_partitioning" = "true"
        "hoodie.finalize.write.parallelism" = "12"
        "hoodie.upsert.shuffle.parallelism" = "12"

        "hoodie.compact.inline" = "true"

        "hoodie.compact.inline.max.delta.seconds" = "720"

        "hoodie.compact.inline.trigger.strategy" = "TIME_ELAPSED"

        "hoodie.clean.automatic" = "true"

        "hoodie.cleaner.policy" = "KEEP_LATEST_COMMITS"

        "hoodie.cleaner.commits.retained" = "18"

        "hoodie.metadata.cleaner.commits.retained" = "18"

        "hoodie.keep.min.commits" = "36"

        "hoodie.keep.max.commits" = "72"

        "hoodie.clustering.inline" = "false"
        "hoodie.clustering.inline.max.commits" = "4"
        "hoodie.clustering.plan.strategy.target.file.max.bytes" = "1073741824"
        "hoodie.clustering.plan.strategy.small.file.limit" = "629145600"

        "hoodie.metadata.enable" = "false"
        "hoodie.metadata.keep.min.commits" = "36"
        "hoodie.metadata.keep.max.commits" = "72"

        "hoodie.datasource.compaction.async.enable" = "false"

        "hoodie.write.markers.type" = "DIRECT"

        "hoodie.embed.timeline.server" = "false"

Course of Events

Let us take the file which the reader tries to find 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet and show how this is changed

═══════════╤═════════════════════════╤═════════════════════╤══════════════════╗ ║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time Taken ║ ╠═══════════╧═════════════════════════╧═════════════════════╧══════════════════╣ ║ (empty) ║ ╚══════════════════════════════════════════════════════════════════════════════╝

4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet parquet April 11, 2022, 22:23:54 (UTC+02:00) 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_2-75-1434_20220411191603.parquet parquet April 11, 2022, 21:19:15 (UTC+02:00)

Please pay attention to the fact that the file under consideration has been deleted with the delete marker at the same time the compaction commit happened which is 22:23:55. Also please pay attention that the only thing that changed is the writeToken. After that moment there is a new file 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet. However, this file is not reflected in 20220411202305.commit which can be seen below

"fileId" : "4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0",
"path" : "cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet",
"prevCommit" : "20220411191603",
"numWrites" : 122486,
"numDeletes" : 0,
"numUpdateWrites" : 122457,
"numInserts" : 0,
"totalWriteBytes" : 7528604,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "cluster=96/shard=14377",
"totalLogRecords" : 846489,
"totalLogFilesCompacted" : 7,
"totalLogSizeCompacted" : 325539587,
"totalUpdatedRecordsCompacted" : 122457,
"totalLogBlocks" : 7,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 7528604,
"minEventTime" : null,
"maxEventTime" : null

"fileIdAndRelativePaths" : {
    "4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0" : "cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet",
    "0139f10d-7a88-481b-b5df-6516500076b0-0" : "cluster=96/shard=14377/0139f10d-7a88-481b-b5df-6516500076b0-0_0-1177-14258_20220411202305.parquet",
    "21000940-a573-4c46-8ad5-79003ac9daf5-0" : "cluster=96/shard=14377/21000940-a573-4c46-8ad5-79003ac9daf5-0_2-1177-14260_20220411202305.parquet"
  },
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 2543875,
"totalLogFilesCompacted" : 21,
"totalCompactedRecordsUpdated" : 368137,
"totalLogFilesSize" : 978467842,
"totalScanTime" : 45421,

║ cluster=96/shard=14377/ │ 4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0 │ 20220411202305 │ s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1198-14280_20220411202305.parquet │ 7.2 MB │ 4 │ 178.2 MB │ 178.2 MB │ 0.0 B │ 24.814273985207016 │ 0.0 │ [HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.4_0-1758-20808', fileLen=46785516}, HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.3_1-1610-19092', fileLen=46440363}, HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.2_0-1450-17202', fileLen=47068201}, HoodieLogFile{pathStr='s3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/.4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_20220411202305.log.1_0-1297-15405', fileLen=46545393}] │ []


**Tried options**

* Turn off File Sizing by setting hoodie.parquet.small.file.limit to 0 to make sure the file is not deleted
* Setting hoodie.datasource.compaction.async.enable to false
* With one table the inline compaction is working as expected. Cleaning is also triggered and does what it should   
* Code has been rewritten to not use foreachBatch
* Even though we are using Single Writer (single Spark application) we tried also optimistic_concurrency_control. The concurrent access is not detected and the issue still persists

**Possibly Related Issues**
* https://issues.apache.org/jira/browse/HUDI-3370. This one describes the similar symptoms but we do not use metadata table service
* Comment https://github.com/apache/hudi/issues/2151#issuecomment-706029681 that implies that when using spark.write() the inline compaction should be off. Not yet found any restrictions in the source code for that matter
* https://github.com/apache/hudi/issues/2043#issuecomment-682100271. Seems like using foreachBatch might not be a good approach to begin with
* https://github.com/apache/hudi/issues/2771#issuecomment-814682374 Not sure if having inline compaction and async compaction might be the root cause
* Another issue with compaction when using foreachBatch approach https://github.com/apache/hudi/issues/4839#issuecomment-1046839752
* Time line server might be a root cause as well https://issues.apache.org/jira/browse/HUDI-2860 and https://issues.apache.org/jira/browse/HUDI-1138

**Stacktrace**

Lost task 0.0 in stage 1.0 (TID 1) (ip.ec2.internal executor 2): java.io.FileNotFoundException: No such file or directory 's3://some-bucket/landing-zone/some_table/cluster=96/shard=14377/4ce009e6-5622-4874-bb5d-a11e3bb9eaa3-0_1-1177-14259_20220411202305.parquet' at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:521) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694) at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:61) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$lzycompute$1(ParquetFileFormat.scala:318) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$1(ParquetFileFormat.scala:317) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:319) at org.apache.hudi.HoodieMergeOnReadRDD.read(HoodieMergeOnReadRDD.scala:105) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

kasured commented 2 years ago

After changing the code and removing foreachBatch we were able to fix the issue https://github.com/apache/hudi/issues/2043#issuecomment-682100271. However, now the issue is reproducible for both inline and async variants of compaction.

I have updated the section Main observations so far.

kasured commented 2 years ago

Upon further investigation and after enabling additional logs on EMR, the deletion of the file during compaction is happening in the class org.apache.hudi.table.HoodieTable#reconcileAgainstMarkers. For some reason this file is considered invalid(duplicated)

if (!invalidDataPaths.isEmpty()) { LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);

However, later in the logs this file is registered and committed in the instant file in .hoodie folder

INFO SparkRDDWriteClient: Committing Compaction 20220414232316. Finished with result HoodieCommitMetadata{partitionToWriteStats={cluster=96/shard=14377=[HoodieWriteStat{fileId='9d9f72e9-9381-40d0-af0c-cb48c25bd78d-0', path='cluster=96/shard=14377/9d9f72e9-9381-40d0-af0c-cb48c25bd78d-0_0-617-7132_20220414232316.parquet', prevCommit='20220414225217', numWrites=122886, numDeletes=0, numUpdateWrites=121939, totalWriteBytes=23331178, totalWriteErrors=0, tempPath='null', partitionPath='cluster=96/shard=14377', totalLogRecords=341027, totalLogFilesCompacted=3, totalLogSizeCompacted=285373803, totalUpdatedRecordsCompacted=121939, totalLogBlocks=9, totalCorruptLogBlock=0, totalRollbackBlocks=0}]}, compacted=true,

So it leaves the system in an inconsistent state. It looks like some concurrency issues to me

In the meantime, according to the logic in HoodieTable it seems like the file is being deleted because there is a marker file MERGE in the temp folder for that instant. This mechanism is described here https://hudi.apache.org/blog/2021/08/18/improving-marker-mechanism/. However, I do not see any single failed task in Spark and in the logs there are no retries that might cause that logic to kick in.

I will be trying the following things

nsivabalan commented 2 years ago

@kasured : before I dive in, few pointers on the write configs used.

  1. I see you have enabled both inline and async compaction. Guess w/ streaming sink to hudi, only async compaction is possible and for MOR table, hudi automatically does async compaction. So, probably you can remove these configs.

    "hoodie.compact.inline" = "true"
    "hoodie.datasource.compaction.async.enable" = "true"
  2. and I also see you have enabled clustering. can we disable clustering and see if the issue is still reproducible.

with these changes, can you let us know if the problem still persists?

kasured commented 2 years ago

@nsivabalan Thank you for looking into that. I have updated the configuration in the description as it was a little out of date. Since the creation of the ticket you can see that I have tried multiple options.

  1. At first iteration I had foreachBatch which was not causing async compaction to happen (please see the linked issues). After the code was rewritten to use just structured streaming constructs async compaction started to be scheduled and executed. So I have tried both inline enabled with async disabled, and vice versa and the issue that I describe is reproduced in both cases
  2. Not sure what you mean as I have cluster disabled explicitly with "hoodie.clustering.inline" = "false". And also I have not seen any clustering actions neither in the .hoodie nor in the logs

All in all please check these two sections Main observations so far and Tried Options. They are up to date and have the summary of all that I have tried so far

If I would some up the main pain point is that when we use multiple isolated StreamingQuery in one spark application (for now we are testing with 3 tables), every first round of compaction leads to marker merge files left in the .temp folder. When HoodieTable finalize write logic kicks in it sees those files that were not deleted and treats them as invalid and deletes those. However, instant.commit file for compaction still points to that file. And when the reader reads that instant it fails with FileNotFoundException

// we are not including log appends here, since they are already fail-safe.
      Set<String> invalidDataPaths = getInvalidDataPaths(markers);
      Set<String> validDataPaths = stats.stream()
          .map(HoodieWriteStat::getPath)
          .filter(p -> p.endsWith(this.getBaseFileExtension()))
          .collect(Collectors.toSet());

      // Contains list of partially created files. These needs to be cleaned up.
      invalidDataPaths.removeAll(validDataPaths);

      if (!invalidDataPaths.isEmpty()) {
        LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
        Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
            .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString()))
            .collect(Collectors.groupingBy(Pair::getKey));

INFO HoodieTable: Removing duplicate data files created due to spark retries before committing. Paths=[table=some_table/cluster=96/shard=14377/7a3553ed-9d81-4a8f-bb2e-2a6d6bb3c1cc-0_2-77-1336_20220416100103.parquet, table=some_table/cluster=96/shard=14377/f778c16d-b674-405d-b32c-b7f5962b2471-0_0-58-1250_20220416100103.parquet, table=some_table/cluster=96/shard=14377/8100e3bd-c520-4531-8c15-00483db1638e-0_1-77-1335_20220416100103.parquet, table=some_table/cluster=96/shard=14377/afe5b900-7483-4c8a-bc5d-5a0ff221aca3-0_3-77-1337_20220416100103.parquet]

Now the questions is why those files are left and not cleared by the compaction commit process earlier I was not able to figure out yet. It might be some concurrency issue where some of the building blocks are shared within the sparkContext unsafely.

nsivabalan commented 2 years ago

yes, I really appreciate your digging in deeper. let me try to understand the concurrency here. what do you mean by multiple concurrent streaming writes? there are 3 streams reading from diff upstream sources and writing to 1 hudi table? or one streaming pipeline which writes to 3 different hudi tables? or 3 different streaming pipeline writing to 3 diff hudi table but using same spark session ?

nsivabalan commented 2 years ago

btw, we did fix an issue wrt how spark lazy initialization and cache invalidation could result in wrong files in commit metadata https://github.com/apache/hudi/pull/4753. looks like exactly matching what you are reporting. Can you try applying the patch and let us know if you still see the issue.

kasured commented 2 years ago

@nsivabalan Sure, let me provide more details. There is a StreamingQuery entity which s started by Spark to consume the stream. This is basically what we use and described here https://hudi.apache.org/docs/compaction#spark-structured-streaming

So what we do is we create multiple StreamingQuery streams and start them. Each of them though consumes from single kafka topic and writes to single Hudi table. So it is 3 different streaming pipeline writing to 3 diff hudi table but using same spark session with the only exception that we use 3 different SparkSession objects. Each of them are reusing single sparkContext which is okay as there should be only one spark context per jvm.

As to 4753 I have already specified it in the section Possibly Related Issues HUDI-3370. However, from what I checked it is related to metadata service which we do not use "hoodie.metadata.enable" = "false". May it also be relevant even if we do not use metadata table? I am asking cause we are using 0.9.0 from Amazon and I will need to replace it with the one with patch

nsivabalan commented 2 years ago

its not related to metadata table as such. essentially, the actual data files as part of the compaction commit could be different from what is found in compaction commit metadata. So, when reconciling markers, we may delete unintended files. yes, it is applicable even if you don't enable metadata.

and thanks for clarifying your use-case. I get it now.

kasured commented 2 years ago

I can see that the fix version is 0.11.0. Can this patch be safely backported to 0.9.0 though?

umehrot2 commented 2 years ago

@kasured I am from AWS EMR team. I hope you have opened a ticket with AWS support and we can work through that channel to backport this fix and provide you patched jars.

umehrot2 commented 2 years ago

@kasured is it possible for you to have 3 separete spark applications that do not share spark context ?

kasured commented 2 years ago

@nsivabalan We were able to reproduce the similar scenario locally. Please, use the following repository to check and confirm on your end https://github.com/kasured/hudi-compaction-5298

In the meantime, we are also exploring different options, such as 1) Switching to Copy On Write table where we do not need to face the compaction. However, in that scenario "compaction" happens with each write which needs a separate attention in terms of tuning the system 2) We have not been able to reproduce that issue for a single table with a single StreamingQuery. If we have the confirmation that this issue does not affect single table, we may try the approach of creating multiple spark applications per single Stream per SparkSession and run them on the EMR cluster as separate jobs. However, given the fact that we have 80+ streams, it will create a much greater operational complexity (monitoring, restarting, deployment and coordination) 3) Use MOR tables but disable the compaction execution in the same application and try to run in in a separate process. However, we are not yet sure if that issue if confirmed will not affect that scenario

rahil-c commented 2 years ago

@kasured Im currently looking into this and was able to reproduce the issue after running your https://github.com/kasured/hudi-compaction-5298.

When I built hudi 0.11.0 (from master) with profile spark3.1 and provided my 0.11.0 spark bundle jar to your sample reproduction I noticed that I did not get the FileNotFoundException anymore when running it. Im currently looking to see if the fix https://github.com/apache/hudi/pull/4753 can be backported to 0.9.0

rahil-c commented 2 years ago

@kasured If you have opened a case with AWS EMR support, we have a backport of the fix for hudi 0.9.0 we can provide you. Let us know so we can close this thread out for now.

nsivabalan commented 2 years ago

Closing this issue as its already been fixed. thanks for raising the issue.

Khushbukela commented 1 year ago

Closing this issue as its already been fixed. thanks for raising the issue.

Hi @nsivabalan can you please help to solve the above issue? I am facing the same problem.