apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Hudi INSERT_OVERWRITE inconsistent behaviour (0.12.3) #11718

Open ergunbaris opened 1 month ago

ergunbaris commented 1 month ago

Description

We have a Spark Application in Golden Zone (medallion arch.) that aggregates the input data, runs in batches in order to write out daily partitions. In the same spark application we have a functionality in order to overwrite each daily partition with hudi's INSERT_OVERWRITE mode. Since we cannot hit all the daily partition aggregations at a single batch we do sequential batches of many daily partitions being produced in each sequence.

Lately we did have to fix data for daily partitions for a 2 year period which ran and complete in dozens of batches (in a single spark application without any intervention) . The first batches we produced ran successfully and generated new parquet files for each daily partition and unreferenced old parquet files from hudi metadata which than were removed with an async hudi cleaner job. For the final x number of batches this behaviour shifted on not unreferencing the old parquet files from hudi metadata followed by cleaner not deleting (as expected) old parquet files thus ended up with duplicate data. Again this happened in a single uninterrupted spark application.

For security reasons I cannot share the archived partition metadata which is referencing the old and the new parquet files. But I have observed this on the problematic daily partitions that the old and the new parquet files were both listed on partition metadata when observed with hudi-cli. And when I observed the related clean job I saw that the days it worked it deleted parquet files where as the days towards the end of application run none of the old parquet files were deleted since they weren't unreferenced.

When the problem started to happen it happened for all the rest of the batches that run sequentially. Such as it didn't randomly happen for an arbitrary batch and kept on happening for the next sequence of batches when it started to happen.

Here is the piece of anonymised code for Hudi Options used during insert overwrite. When argument "force" is passed from top level application it applies to all the batches in the same way.

Map[String, String](
      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "some_id",
      HoodieWriteConfig.TBL_NAME.key             -> "some_table_name",
      DataSourceWriteOptions.OPERATION.key ->
        (if (overWrite)
           DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL 
         else DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL),
      DataSourceWriteOptions.TABLE_TYPE.key              -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
      DataSourceWriteOptions.PARTITIONPATH_FIELD.key     -> "date",
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
      DataSourceWriteOptions.PRECOMBINE_FIELD.key        -> "some_id",
      "hoodie.copyonwrite.record.size.estimate"          -> "512",
      "hoodie.combine.before.upsert"                     -> "false",
      "hoodie.clean.automatic"                           -> "false", //a separate air-flow job runs HoodieCleaner
      "hoodie.metadata.enable"                           -> arguments.isHudiMetadataEnabled.toString
    )

And the spark write code.

requests.write
        .format("org.apache.hudi")
        .options(hudiReplayOptions(overWrite = overWrite, arguments = arguments) ++ additionalHudiOptions)
        .mode(SaveMode.Append)
        .save(writePath)

These are the additionalHudiOptions

Map[String, String](
            "hoodie.write.concurrency.mode"       -> "optimistic_concurrency_control",
            "hoodie.write.lock.provider"          -> "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
            "hoodie.write.lock.filesystem.expire" -> arguments.hudiLockExpiryDuration.toString

We are in the phase of replacing hudi lock mechanism to move away from S3 based locking since it was later announced by hudi support not to be relied on. There is another concurrent incremental hudi job running during the partition fix job although it is highly unlikely that both processes try to acquire the lock at the same atomic moment. And even if this was a source to suspect lets remember that when the problem started to happen for batch it happened for all the next batches as well. It is impossible this to happen sequentially for many times. And also during the operation no hudi metadata related failure or corruption happened.

To Reproduce

Steps to reproduce the behavior:

  1. Unfortunately we could not reproduce the same outcome with existing production code! And we didn't run the same process again with the same data since it very costly to be able to recreate the situation.

Expected behavior

We expect INSERT_OVERWRITE to work as expected for all the sequential batches within the same Spark application.

Environment Description

Stacktrace

NO Stacktrace since there is no Failure..

ergunbaris commented 1 month ago

https://apache-hudi.slack.com/archives/C4D716NPQ/p1721319672256669

KnightChess commented 1 month ago

@ergunbaris hi, does the related replace commit instance is archived before clean service runs?

ergunbaris commented 1 month ago

@KnightChess Thanks for the question I have looked into production code and hoodie directory and timeline is like below

So basically all the replace commits were archived after cleaner ran.

On the other hand first archived replacecommits would always be the oldest date partition dates. But in our case oldest date partitions were successfully processed and all the date partitions related to the batches towards the end of the process were duplicated!

KnightChess commented 1 month ago

@ergunbaris if the related replace commit instance is archived after clean service runs, that should not be caused by this problem. Another situation is spark speculates task will also can cause duplicate, like #9615