apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

UPSERTs are taking time #9976

Open darlatrade opened 10 months ago

darlatrade commented 10 months ago

Upsert is very slow and is taking 10 to 15 mins to load 678.0 KB into HUDI COW table. Not sure where the time is taken. Can some one please help me where is the issue?

Environment Description

Here is the Spark UI hudi_upsert_issue

s3_hoodie

Configs used:

"className": "org.apache.hudi", "hoodie.table.name": tgt_tbl, "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.precombine.field": "evnt_cent_tz", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "year,month",
"hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.assume_date_partitioning": "false", "hoodie.datasource.hive_sync.table": tgt_tbl, "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode":"hms", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.upsert.shuffle.parallelism": 50, "hoodie.delete.shuffle.parallelism": 50, "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT", "hoodie.index.type": "BLOOM", "hoodie.metadata.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "true", "hoodie.metadata.index.bloom.filter.parallelism": 100, "hoodie.metadata.index.bloom.filter.column.list": "id", "hoodie.bloom.index.use.metadata": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.metadata.index.column.stats.column.list": "col1,col2,col3",
"hoodie.enable.data.skipping": "true"

ad1happy2go commented 10 months ago

@darlatrade As I see it is taking time in the "Doing partition and writing data", it probably mean your incremental may be touching lot of file groups so it had to rewrite lot of parquet files as it is COW table. Can you check how much data got written from this stage on Spark UI?

darlatrade commented 10 months ago

@ad1happy2go Thanks for the reply

Here is the stage detail. Not sure where to look at exact size.

image

I see in .hoodie that its inserting 700KB

ad1happy2go commented 10 months ago

You can try to open this commit file and see how many file groups are being updated as part of this commit. How many partitions you have in your table?

darlatrade commented 10 months ago

Commit file has 16745 lines. Table has month level partitions and last commit touched almost 1 year (12) partitions. We are maintaining 3 years (36 partitions - 12 per a year). Looks like 700 file groups are updated (Found in "fileIdAndRelativePaths" section)

vinothchandar commented 10 months ago

@darlatrade Just to weed anything out. is it easy for you to try this table on 0.14 version in a test/staging environment?

Do you have the Spark Stages UI screenshot. we can see from there how the input amplifies across different stages.

darlatrade commented 10 months ago

Thanks for quick reply @vinothchandar We completed most of the testing on 0.10.1. May not be able to upgrade soon. But at least I can try with 0.14 and test for this table if that helps.

Here is the stages screenshot.

image
nsivabalan commented 10 months ago

hey @darlatrade : can you help w/ some more info.

  1. Whats the size of the table.
  2. I assume its COW table.
  3. based on your stats, looks like we have 60 file groups minimum per partition and we are updating 12 partitions which comes to 720 file groups. So, this will involve rewriting 720 parquet files for which hudi might spin up 720- tasks. With COW table, its known that updating very small percentage of data spread across lot of file groups might result in some overhead.

if these matches your workload, and if you prefer faster write times, may be you can try MOR table.

  1. Also, I see you are in 0.10.1. Some of the configs you have shared may not be applicable in 0.10. just incase you may want to remove them
    "hoodie.metadata.index.bloom.filter.enable": "true",
    "hoodie.metadata.index.bloom.filter.parallelism": 100,
    "hoodie.metadata.index.bloom.filter.column.list": "id",
    "hoodie.bloom.index.use.metadata": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
    "hoodie.metadata.index.column.stats.column.list": "col1,col2,col3",
    "hoodie.enable.data.skipping": "true"
nsivabalan commented 10 months ago

if my understanding of your pipeline/workload is wrong, lets sync up in hudi OSS workspace. we can see whats going on.

darlatrade commented 10 months ago

@nsivabalan

  1. Size of the table and no file objects in root folder of table.
image
  1. Yes its COW table.

  2. Yes your calculation is correct on file groups. We can think of MOR for future upgrades we may not able to switch right now.

  3. Sure will remove configs and run.

nsivabalan commented 10 months ago

got it. may I know whats your record key comprises of. I mean, I see it as "id". but is it random id or does it refer to some timestmap based keys. If its timestamp based values, we could trigger clustering based on record key and so chances that your updates are confined to lesser no of file groups per partition(but large perc of records within each file group) instead of updating very less perc among large no of file groups.

darlatrade commented 10 months ago

Here is how "id" is derived.

df.withColumn("id", concat("evnt_centtz",lit(""),md5(concat("key_col1","key_col2","key_col3","evnt_cent_tz"))))

Sample values from table:

image
darlatrade commented 10 months ago

@nsivabalan Any inputs on this?

soumilshah1995 commented 10 months ago

can you use new RLI and 0.14 version of hudi ?

https://www.linkedin.com/pulse/upsert-performance-evaluation-hudi-014-spark-341-record-soumil-shah-oupre%3FtrackingId=PeKhUkGNTkuSD1VRqoI3rw%253D%253D/?trackingId=PeKhUkGNTkuSD1VRqoI3rw%3D%3D

nsivabalan commented 10 months ago

yeah. As I suggested before, you may want to try our MOR table. and try using SIMPLE index. in 0.10.1 hudi uses bloom index and for random keys it might incur some unnecessary overhead.

nsivabalan commented 10 months ago

and yeah. upgrading to 0.14.0, you can leverage RLI and that should def boost your index latencies and write latencies.

darlatrade commented 10 months ago

what are the hadoop configs to be considered to load 500GB of data in monthly partitions for RLI and 0.14?

soumilshah1995 commented 10 months ago

well Apache Hudi 0.14 you can leverage RLI for faster UPSERT


 'hoodie.metadata.record.index.enable': 'true',
    'hoodie.index.type':'RECORD_INDEX'

sample code can be found https://soumilshah1995.blogspot.com/2023/10/apache-hudi-014-announces.html

darlatrade commented 10 months ago

I am trying to initialize new table with RLI. I need to load the history first which has 3210407531 records and 520GB data.

Spark context is shutting down to load this much data..Also number objects are huge as below screenshot

image

Hoodie config : "className": "org.apache.hudi", "hoodie.table.name": tgt_tbl, "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.precombine.field": "eff_fm_cent_tz", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "year,month", "hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.assume_date_partitioning": "false", "hoodie.datasource.hive_sync.table": tgt_tbl, "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.bulkinsert.shuffle.parallelism": 700, "hoodie.index.type": "RECORD_INDEX", "hoodie.metadata.record.index.enable":"true", "hoodie.metadata.enable": "true"

Also using hudi_operation = "bulk_insert" hudi_write_mode = "overwrite"

Error:

image

Can you suggest what parameters need to be used to load this data? I need to load history first before starting deltas.

ad1happy2go commented 10 months ago

@darlatrade You need to increase hoodie.metadata.record.index.min.filegroup.count to a higher number depending upon the amount of data you have. Let us know if it helps. Thanks.

ad1happy2go commented 8 months ago

@darlatrade Did the suggestion worked? DO you need any other help here?

darlatrade commented 8 months ago

RECORD_INDEX is not working with bulk_upsert. How do we handle to load initial history. Its taking forever to load. Any recommendations for bulk_upsert to load 500GB for initial load?

soumilshah1995 commented 7 months ago

Certainly! Your suggestion is indeed a great point. Considering the volume of historical data, implementing an asynchronous method for indexing could significantly improve efficiency and reduce the time required. What are your thoughts on incorporating asynchronous indexing to streamline the process?

maybe I should open a separate thread for discussing this

darlatrade commented 7 months ago

Yes that works