apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

Partitioning data into two keys is taking more time (10x) than partitioning into one key. #10456

Open maheshguptags opened 9 months ago

maheshguptags commented 9 months ago

I am trying to add second level of partition to my table instead of one level of partition but it is taking 10X time as compared to single level partition in hudi flink job.

I tried to ingest 1.8M record into one level of partition and it took around 12-15 Min to ingest all the data then with same configuration I just added another level of partition key with same data payload and it took around 1 hour 45 Min to complete the process.

To Reproduce

Steps to reproduce the behavior: below is the configuration that I am using for table. You can add the table creation statement with below properties.

PARTITIONED BY (`client_id`,`hashed_server_id`)
WITH ('connector' = 'hudi','path' = '${table_location}',
'table.type' = 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field' = 'a,b',
'payload.class'='x.y.PartialUpdate',
'precombine.field'='ts',
'hoodie.clean.async'='true',
'hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS',
'hoodie.clean.automatic' = 'true',
'hoodie.clean.max.commits'='5',
'hoodie.clean.trigger.strategy'='NUM_COMMITS',
'hoodie.cleaner.parallelism'='100',
'hoodie.cleaner.commits.retained'='4',
'hoodie.index.type'= 'BUCKET',
'hoodie.index.bucket.engine' = 'SIMPLE',
'hoodie.bucket.index.num.buckets'='16',
'hoodie.bucket.index.hash.field'='a',
'hoodie.parquet.small.file.limit'='104857600',
'hoodie.parquet.compression.codec'='snappy')

Expected behavior As it is just a partition addition to the storage it should not impact the performance much(I can understand if it takes 5-7 min extra as complexkey generation is bit slower than simplekey ).

Environment Description

Additional context

My table type is upsert and I have test the functionality and it is working fine and I cannot change the table type.

I also discussed with @ad1happy2go and he also suggested that it wont impact much as it just a another level of partition.

CC : @ad1happy2go @codope @danny0405 @yo

xicm commented 9 months ago

Add a partition field means more tasks. And the index is BUCKET, the tasks could be bucket_num*partitions in some cases.

maheshguptags commented 9 months ago

@xicm Yes I agree but it would not effect it to 10 times.

let say I have 100 partition and each partition has 10 sub-partition with 16 bucket then total task would be 1001016 at max whereas with single partition it has 100*16 right?

I understand it will take 5-7 min extra compared to single partition but not 10 times.

let me know your thoughts

xicm commented 9 months ago

not sure if this is the cause, can you check the number of file groups after partition field changed, and reduce the bucket number to see the time cost.

maheshguptags commented 9 months ago

@xicm let me reduce the number of bucket and test it for same number of record to check process time. can you tell me how to check number of filegroup?

maheshguptags commented 9 months ago

@xicm I reduced the number of bucket( it makes sense to reduce the bucket size as we have second level partition) but it is still taking 45-50 min to execute which 5 times as compare to 1 level partition.

xicm commented 9 months ago

can you tell me how to check number of filegroup?

cli or spark sql, show_commits, pay attention to total_files_added and total_files_updated

it is still taking 45-50 min to execute which 5 times as compare to 1 level partition.

reduce the bucket num and increase write.tasks, test a few times to get a better performance

maheshguptags commented 9 months ago

Yes I am trying to test the different combination with bucket number.

maheshguptags commented 9 months ago

Hi @xicm, I tried below combination with same number record.

image

Please find the below details related to filegroups

image

After testing it several times I noticed that 8,4 buckets looks good for data size which is <100M.

As we know once the number of buckets is set we cannot change it.

so I have question related to same.

Suppose I took 8 as buckets and the streaming data is constantly growing (100 million per ID), will it affect the performance (considering that the job is streaming)?

Thanks Mahesh Gupta

xicm commented 9 months ago

Small bucket num will not fit the growing data. Generally We estimate the data size to determine the number of buckets.

I think you problem is the data is too scattered. There are many tasks, need bigger write.tasks

maheshguptags commented 9 months ago

I already had 20 task to write the data, please check in below screenshot. do you want me to increase it more?

image
xicm commented 9 months ago

Can you check the SubTasks of bucket_assigner in flink ui. This tells us how many tasks in a write operation.

maheshguptags commented 9 months ago

Yes. it is 20. it start from 0 and ending with 19.

image
xicm commented 9 months ago

Sorry for my wrong understanding of SubTasks. Hudi splits the input data by partition+fileGroup and then writes these partitioned data with parallelism of write.tasks. The job write 2000+ files in a commit, parallelism of 20 is too small.

maheshguptags commented 9 months ago

@xicm Let me try to increase the number write task and for load and test the performance. Is there a way to control the number of file group for particular commit?

thanks

danny0405 commented 9 months ago

hoodie.bucket.index.num.buckets controls the number of buckets under one partiiton, and by default it is 4 in Flink.

maheshguptags commented 9 months ago

@danny0405 I am asking about the number of file group added for particular commit. I am already implementing bucket index. Number of filegroup is more than 2000 for a commit.

maheshguptags commented 9 months ago

Hi @xicm and @danny0405, I tried to increase the parallelism as @xicm suggested but it is trying to consume the data in a single commit i.e. it accumulates the data into a single commit which causes a Heap OOM issue.

image

Commit size from .hoodie folder

The second commit is trying to consume the entire data in one commit, i.e., creating a 41MB .commit file.

image

Can we reduce/control the commit filesize? Can we hop on a call to resolve this issue?

let me know your thoughts. Thanks

xicm commented 9 months ago

Can you redesign the partitions? There are only 1G - 2G of data, but there are so many partitions.

maheshguptags commented 9 months ago

@xicm The dataset is huge, around 100M. However, for performance evaluation, I have only ingested 1.78M. cc : @pratyakshsharma

danny0405 commented 9 months ago

Yeah, try to deduct the number of file groups per-commit, because for each file group, we have a in-memory buffer before flushing into disk.

maheshguptags commented 9 months ago

@danny0405 can you please share the config to deduct the filegroup per-commit?

ad1happy2go commented 9 months ago

Discussed with @maheshguptags . Advised to explore flink Kafka stream configs to control number of records/bytes in one MicroBatch. https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

cc @danny0405

maheshguptags commented 9 months ago

@ad1happy2go I tried the below configuration for Kafka, but it didn't help.

source.kafka.max.poll.records=300
source.kafka.max.poll.interval.ms=300000

I tried different configurations for the above config. cc : @danny0405

maheshguptags commented 8 months ago

@danny0405 Can help with this plz?

ad1happy2go commented 8 months ago

@maheshguptags Lets get into a call to discuss this further.

ad1happy2go commented 8 months ago

@xicm @danny0405 Had a discussion with @maheshguptags . Let me try to summarise his issue.

He is having around 5000 partitions in total and using the bucket index. When he use parallelism(write.tasks) as 20 the job takes 1:45 mins and when it is 100 it takes 35 mins.

But with increase in parallelism, the number of file groups explodes as expected. This result in lot of small file groups with very few records each (~20) , which ultimately causing OOM due to 400MB commit files.

maheshguptags commented 8 months ago

Hi @ad1happy2go, There is little correction on the commit file size.

which ultimately causing OOM due to 400MB commit files.

its a 41 Mb commit file size @ad1happy2go.

maheshguptags commented 7 months ago

@danny0405 still waiting for your response. can you please take look on this plz?