apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.18k stars 2.38k forks source link

[SUPPORT] Hudi Upsert Very Slow/ Failed With No Space Left on Device #3418

Closed FeiZou closed 1 year ago

FeiZou commented 2 years ago

Describe the problem you faced

Hi there, I'm migrating a table from S3 data lake to Hudi data lake using Spark. The source table data size is around 600 GB and 8 B rows, each partition contains around 1.5GB data and 20 M rows. The target hudi table is non partitioned and currently data size around 260 GB. With 30 executors, 150 total cores, 32GB memory per executor set up, it cost more than 3 hours to upsert one single partition into the Hudi table. If I reduce the executors to 15, it will end up failed with No Space Left On Device Error during upserting. (We are using EC2 instance for spark worker which has 300GB EBS)

Hudi and Spark config I'm currently using is as below:

    HoodieWriteConfig.TABLE_NAME -> "hudi_recordings",
    HoodieWriteConfig.WRITE_PAYLOAD_CLASS -> "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
    DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
    HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP -> "5",
    HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP -> "10",
    HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP-> "15",
    HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE -> BloomFilterTypeCode.DYNAMIC_V0.name(),
    DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
    DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "sid",
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "date_updated")

 val df = spark.read.json(basePath).repartition(500).persist(StorageLevel.MEMORY_AND_DISK)

 df.write.format("org.apache.hudi")
              .options(hudiOptions)
              .mode(SaveMode.Append)
              .save(output_path)

I'm trying to reduce hoodie.keep.min.commits and hoodie.cleaner.commits.retained in hoping it could reduce the data size . Also I reduce UPSERT_PARALLELISM from 1500 to 500.

To Reproduce

Steps to reproduce the behavior:

  1. Create a new Hudi table
  2. Loading 1.5 GB data which around 20 Million rows
  3. Using Hudi and spark config provide above
  4. Running spark job with 30 executors, 150 total cores, 32GB memory per executor

Expected behavior

The upserting will take around 3 hours

Environment Description

Additional context

Please let me know what additional spark logs would be helpful and I can provide them.

nsivabalan commented 2 years ago

Hudi has bulk_insert to assist in doing a bulk load of data into hudi ( "hoodie.datasource.write.operation" = "bulk_insert"). And bulk_insert has row writer path which is more performant as well. ("hoodie.datasource.write.row.writer.enable" = "true") You can define diff sort modes during bulk_insert. NONE, GLOBAL_SORT, (default) PARTITION_SORT Sorting is based on {partitionpath + record key}

("hoodie.bulkinsert.sort.mode")

Let me know how it goes. Once you are done w/ bulk load (bulk_insert), you can start using upserts for other incremental ingests.

FeiZou commented 2 years ago

@nsivabalan Sorry I didn't made it clear. The source table we are migrating contains duplicate(one PK with several different timestamps) that's why we want to use upsert to remove the duplicate during the migration. And I have another question that should I use partitioned or non-partitioned table in this case? From reading the docs, it's saying that if I want to update records within partitioned table, I need to use globalBloomIndex instead bloomIndex. Not sure how partitioned table with globalBloomIndex will perform comparing with non-partitioned table with bloomIndex base the total data I have?

nsivabalan commented 2 years ago

You can still dedup using bulk_insert using config ("hoodie.combine.before.insert"), but just that it will dedup among incoming records. But, if you wish to update an already existing record on hudi, then yeah upsert is the way to go. But looks like you are doing this as one time migration and so bulk_insert would work out. Once the initial migration is complete, you can start doing "upsert"s.

There is some overloaded terminologies w/ partitioned and non-partitioned dataset. Let me try to explain.

Partitioned dataset: A pair of record key and partition path forms a unique record in hudi. But here, there could be two types of indices. Regular and global. In case of regular (BLOOM/SIMPLE), there could be duplicate records found across partition path. If your incoming record is (rec1, pp1,. ....), only data within pp1 will be searched for updates and either do update or routed as inserts. Where as in case of global index, if an incoming record is (rec1, pp1, ...), all partitions will be searched for updates and updated accordingly.

And then there is non-partitioned dataset, where all record go into a single folder. NonPartitionedKeyGen. Guess this is self explanatory. This is synonymous to partitioned dataset above with just 1 partition.

Let me know if you are looking for more info.

FeiZou commented 2 years ago

@nsivabalan Great thanks for your patient explanation! I 'm running the job with bulk_insert now. I keep it as non_partitioned table as for now, as from your explanation, the write performance should be similar between partitioned and non_partitioned tables. Will update here once the job finished.

FeiZou commented 2 years ago

Hey @nsivabalan, the bulk_insert job has been finished successfully, the efficiency is really amazing. However there are still duplicates as I checked, even though I enabled hoodie.combine.before.insert. Full config details I used as below:

val hudiOptions = Map[String,String](
            HoodieWriteConfig.TABLE_NAME -> "recordings_hudi",
            HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP -> "true",
            DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
            DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
            DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
            DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
            DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "sid",
            DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "date_updated")

There are duplicates with same sid and different data_update field. Any insight on it?

nsivabalan commented 2 years ago

sorry, if you use NonPartitionedKeyGen, hudi assumes empty string ("") as partition field for all records. Hence you will find duplicates if you are checking for pair of (record key, partition path). I would recommend using SimpleKeyGen (its default keyGen, so you don't need to set one). Let me know how it goes.

Here is some thought on deciding partition path. Many companies usually have lot of queries around datestr. (select a,b,c from table where datestr >= x and datestr <= y). When the query hits hudi, if there are 2000 partitions, and datestr range passed in the query is only for past 7 days, hudi looks into only past 7 days. Also during upsert, hudi does an indexing action to know whether a record is being updated or is a new insert. And so, w/ partitioned dataset, search space for a record (record key, partition path) is bounded. If not, for every record key, hudi has to search entire dataset and hence you could see higher latencies as well(if you go w/ non-partitioned).

Hope you get an idea how partitioning helps in keeping both your write and read latencies lower.

FeiZou commented 2 years ago

Hey @nsivabalan, I was able to load the data successfully using SimpleKeyGen. I can see that data been partitioned well. Now I'm trying to figure out how I can sync the partition info to Hive metastore properly. Our current cluster setup does not support automatic Hudi Hive Sync though. Is there any straight forward manual way to do it?

liujinhui1994 commented 2 years ago

org.apache.hudi.hive.HiveSyncTool @FeiZou Please take a look at this tool, it seems to meet your requirements,

nsivabalan commented 2 years ago

Let us know if the HiveSyncTool solves your use-case. We can close out the issue.

FeiZou commented 2 years ago

Thanks @nsivabalan and @liujinhui1994, HiveSyncTool did helps! So based on your advice, bulk_insert help a lot on migration the table. Now after we done with migration, I'm trying to use upsert operation to load data incrementally. I'm observing that upsert is a single day's data is costing 3 hours while bulk_insert the whole historical data only takes 2 hours given the same resource. Do you have any advice on improve the performance of upsert? I'm using following config for upsert:

val hudiOptions = Map[String,String](
            HoodieWriteConfig.TABLE_NAME -> "hudi_table",
            HoodieIndexConfig.INDEX_TYPE_PROP -> "GLOBAL_BLOOM",
            DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY  -> "true",
            DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
            DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "data_load_date",
            DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator",
            DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
            DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "sid",
            DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "date_updated")
liujinhui1994 commented 2 years ago

感谢@nsivabalan和@liujinhui1994,HiveSyncTool 确实有帮助!因此,根据您的建议,bulk_insert对迁移表有很大帮助。现在,在我们完成迁移后,我正在尝试使用upsert操作来增量加载数据。我观察到upsert一天的数据需要花费 3 个小时,而bulk_insert在相同资源的情况下,整个历史数据只需要 2 个小时。您对提高性能有什么建议upsert吗? 我正在使用以下配置upsert

val hudiOptions = Map[String,String](
            HoodieWriteConfig.TABLE_NAME -> "hudi_table",
            HoodieIndexConfig.INDEX_TYPE_PROP -> "GLOBAL_BLOOM",
            DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY  -> "true",
            DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
            DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "data_load_date",
            DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator",
            DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
            DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "sid",
            DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "date_updated")

If there are a lot of updates, you can try MOR table

FeiZou commented 2 years ago

Hey from my understanding, MOR table will have high cost during the reading than COR. Especially in our case, we have duplicated rows in the delta commit which needed to be handled during the reading time. Reading time is kind of more of concern for us. Please correct me if my understanding is wrong and please let me know if you guys have any clue on improving the upsert spark job performance, either from Spark side or Hudi side.

And I have another concern actually, I found that there are still duplicates existing after I done the bulk_insert table migration using SimpleKeyGen with CopyOnWrite table type. The count of duplicate numbers is same as the table using NonPartitionedKeyGen. Any thought on that? @nsivabalan

nsivabalan commented 2 years ago

wrt duplicates, did you use SimpleKeyGen and same config values for record keys and partition path for both bulk_insert and upsert?

FeiZou commented 2 years ago

Hey @nsivabalan, actually as I checked, the duplicates has nothing to do with upsert, it happened during the bulk_insert. A proof is that 2 duplicates rows with same key but different data_updated existing in same partition. They have same hoodie_commit_time 20210817130414, same hoodie_record_key RE2dd866834bea72ac37612d3f57afe58c. Shouldn't these 2 duplicated rows been combined before writing into the data lake?

Adding the config for bulk_insert for your reference:

val hudiOptions = Map[String,String](
            HoodieWriteConfig.TABLE_NAME -> "recordings_hudi",
            HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP -> "true",
            HoodieIndexConfig.INDEX_TYPE_PROP -> "GLOBAL_BLOOM",
            DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY  -> "true",
            DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
            DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
            DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "data_load_date",
            DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator",
            DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
            DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "sid",
            DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "date_updated")
nsivabalan commented 2 years ago

If you wish to dedup with bulk_insert, we also need to set "hoodie.combine.before.insert" to true. Just to clarify, bulk_insert will not looking into any records in storage at all. so setting this config, will ensure incoming batch is deduped and written to hudi. In other words, if you do 2 bulk_inserts, one followed by another, each batch will write unique records to hudi, but if there are records overlapping between batch 1 and batch2, bulk_insert may not update it.

hope that clarifies.

FeiZou commented 2 years ago

Hey @nsivabalan, I did set hoodie.combine.before.insert as true if you check the config I provided above HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP -> "true". And I'm doing bulk_insert as one time loading(same batch) so there are is no stored records existing.

FeiZou commented 2 years ago

Hey @nsivabalan @vinothchandar sorry to trouble you again, but anything you can suggested here?

nsivabalan commented 2 years ago

@FeiZou : if you are still having the issue, can you provide us with reproducible steps.

nsivabalan commented 2 years ago

this is not reproducible locally. Only diff I see is GLOBAL_BLOOM. I will try to test it out and let you know. in the mean time, if you can provide us with reproducible steps, would be good. I assume your case is, just with one round of bulk_insert with input df having duplicates, and setting appropriate configs, you see dups in hudi after writing to hudi table.

FeiZou commented 2 years ago

Hey @nsivabalan thanks for checking! Really appreciate it! Yes, you assumption is very exact! Please let me know if you need any other information to reproduce the issue.

nsivabalan commented 1 year ago

We have done lot of improvements around perf w/ hudi. https://hudi.apache.org/blog/2022/06/29/Apache-Hudi-vs-Delta-Lake-transparent-tpc-ds-lakehouse-performance-benchmarks

Can you try out 0.12. Closing it out due to long inactivity.

Feel free to open new issue if you need assistance.