Closed Ambarish-Giri closed 1 year ago
Hi Team, following up on the ticket to check if there is any update.
Hi @nsivabalan, I was looking @ some assistance on this. I have followed all the optimization provided in https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=115510763#content/view/115510763 but then too .....Hudi insert of 53 GB gzip file in a fairly large EMR : Cluster Size:1 Master (m5.xlarge) node , 2 (r5a.24xlarge) core nodes and 6 (r5a.24xlarge) is taking almost 2 hrs.
I have given the all the details above.
You can try Flink hudi instead, very good performance.
Hi @danny0405 as mentioned my use case is purely batch....does Flink Hudi is for streaming or batch? Moreover my core application is on Spark hence wanted to go with Spark only .
Yeah, Spark is good for batch case, but the Bloom index is not vary stable when you updates are kind of random(for target partitions), if the BloomFilter got false positive, hoodie would scan the whole parquet file which is the reason why it is slow.
Hi @danny0405 can you explain a bit more on "if the BloomFilter got false positive"?
In my case the record key is concat(uuid4,segmentId). SegmentId is an integer value i.e. it can be same for multiple records and uuid4 is standard unique random value ( note: "-" are being removed from the uuid4 values though), but a combination of both identifies a record uniquely and partition key is again segmentId as it has low cardinality.
Hey hi @Ambarish-Giri : For initial bulk loading of data into hudi, you can try "bulk_insert" operation. it is expected to be faster compared to regular operations. Ensure you set the right value for avg record size config . for subsequent operations, hudi will infer the record size from older commits. But for first commit (bulk import/bulk_insert), hudi relies on this config to pack records to right sized files.
Couple of questions before we dive into perf in detail:
Hi @nsivabalan ,
Sure will try bulk-insert once and update. Also regarding "right value for avg record size config" its specific to Copy On Write hoodie.copyonwrite.record.size.estimate. For Merge on Read there is no such config?
1# Upserts can be spread across partitions or can be specific as well as per the data received for that day, and it can have just appends as well. 2# No the records key doesn't have any timestamp affinity, as mentioned the record key is concat(segmentId,uuid4). SegmentId is an integer value i.e. it can be same for multiple records and uuid4 is standard unique random value ( note: "-" are being removed from the uuid4 values though), but a combination of both identifies a record uniquely and partition key is again segmentId as it has low cardinality
Hi @nsivabalan,
userSegDf.write .format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKey) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.SIMPLE.toString()) .option(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP,200) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY, true) .option(HoodieWriteConfig.UPSERT_PARALLELISM, customNumPartitions) .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP, false) .option(HoodieWriteConfig.WRITE_BUFFER_LIMIT_BYTES, 41943040) .option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 100) .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Append) .save(s"$basePath/$tableName/")
userSegDf.write .format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKey) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.SIMPLE.toString()) .option(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP,200) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY, true) .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP, false) .option(HoodieWriteConfig.WRITE_BUFFER_LIMIT_BYTES, 41943040) .option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 100) .option(HoodieWriteConfig.BULKINSERT_SORT_MODE, BulkInsertSortMode.NONE.toString()) .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Overwrite) .save(s"$basePath/$tableName/")
Using simple Index helped a bit but now the below stage is running for more than 2 hrs, though it is progressing but very slowly :
Let me know in case any more details are required.
Hi @nsivabalan ,
We have been trying to optimize the upsert but still the 44GB upsert over a 54 GB bulk-insert in a fairly big cluster is taking more than 3 hrs. Below in the EMR cluster configuration and the Upsert config:
userSegDf.write .format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKey) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.SIMPLE.toString()) .option(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP,50) .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, true) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY, true) .option(HoodieWriteConfig.UPSERT_PARALLELISM, 200) .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP, false) .option(HoodieWriteConfig.WRITE_BUFFER_LIMIT_BYTES, 41943040) .option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 100) .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Append) .save(s"$basePath/$tableName/")
Cluster config: Static EMR cluster: 1 Master (m5.xlarge) node and 8 * (r5d.24xlarge) core nodes
Spark-Submit Command 👍
spark-submit --master yarn --deploy-mode client \
--num-executors 192 --driver-memory 4G --executor-memory 20G \
--conf spark.yarn.executor.memoryOverhead=4096 \
--conf spark.yarn.driver.memoryOverhead=2048 \
--conf spark.yarn.max.executor.failures=100 \
--conf spark.task.cpus=1 \
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.yarn.maxAppAttempts=3 \
--conf spark.executor.cores=4 \
--conf spark.segment.etl.numexecutors=192 \
--conf spark.network.timeout=800 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.task.maxFailures=4 \
--conf spark.shuffle.minNumPartitionsToHighlyCompress=32 \
--conf spark.segment.processor.partition.count=1536 \
--conf spark.segment.processor.output-shard.count=60 \
--conf spark.segment.processor.binseg.partition.threshold.bytes=500000000000 \
--conf spark.driver.maxResultSize=0 \
--conf spark.hadoop.fs.s3.maxRetries=20 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.shuffle.partitions=3000 \
--class
Hi @nsivabalan @danny0405 any updates on the above issue??
got it, would you mind sharing the screenshots of spark stages. we will get an idea of where the time is spent more.
btw, an orthogonal point. I see your record key is {segmentId,uuid} and partition path is segmentId. Not sure if you need to prefix segmentId to your record keys, if you are solely using it to uniquely identify unique records and apply updates within hudi. If there is no external facing requirement for record keys to be a pair of {segmentId,uuid}, you can just have uuid.
If your cardinality for partition is low, we can try to partition using a diff field which could have high cardinality. We can leverage more parallel processing depending on the no of partitions. Within each partition, we can't do much of parallel processing and so we are limited. I mean, hudi does assign one file group to each executor, but I am talking about indexing.
Hi @nsivabalan ,
1# Correct I was considering {segmentId,uuid} , ComplexKey as record key as combined key uniquely identifies records, since partitioning is done on segmentId it makes sense to have just uuid as record key. I have taken care of the orthogonal issue you pointed out.
2# Partitioning by segmentId for the data seems to be appropriate and its not of that low cardinality for eg. 50 GB data will have nearly 3000 unique segments and the consecutive upserts will just add to that number probably 1000 more for upsert of equivalent data size .
3# I am using MOR write strategy.
4# Below are my cluster configuration: 1 r5.2xlarge master node and 100 r5.4xlarge core nodes
5# spark submit command:
`spark-submit --master yarn --deploy-mode client --num-executors 100 --driver-memory 12G --executor-memory 48G \
--conf spark.yarn.executor.memoryOverhead=8192 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.shuffle.io.numConnectionsPerPeer=3 \
--conf spark.shuffle.file.buffer=512k \
--conf spark.memory.fraction=0.7 \
--conf spark.memory.storageFraction=0.5 \
--conf spark.kryo.unsafe=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.connection.maximum=2000 \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.hadoop.fs.s3a.connection.establish.timeout=500 \
--conf spark.hadoop.fs.s3a.connection.timeout=5000 \
--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 \
--conf spark.hadoop.com.amazonaws.services.s3.enableV4=true \
--conf spark.hadoop.com.amazonaws.services.s3.enforceV4=true \
--conf spark.yarn.nodemanager.pmem-check-enabled=true \
--conf spark.yarn.nodemanager.vmem-check-enabled=true \
--conf spark.driver.cores=4 \
--conf spark.executor.cores=3 \
--conf spark.yarn.driver.memoryOverhead=4096 \
--conf spark.yarn.max.executor.failures=100 \
--conf spark.task.cpus=1 \
--conf spark.rdd.compress=true \
--conf spark.yarn.maxAppAttempts=3 \
--conf spark.segment.etl.numexecutors=100 \
--conf spark.network.timeout=800 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.task.maxFailures=4 \
--conf spark.shuffle.minNumPartitionsToHighlyCompress=32 \
--conf spark.segment.processor.partition.count=1536 \
--conf spark.segment.processor.output-shard.count=60 \
--conf spark.segment.processor.binseg.partition.threshold.bytes=500000000000 \
--conf spark.driver.maxResultSize=2g \
--conf spark.hadoop.fs.s3.maxRetries=2 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryo.registrationRequired=false \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.shuffle.partitions=1536 \
--class
Below are the Hudi Spark stages which are consuming maximum time: BulkInsert (MoR):
Upsert (MoR):
Hi @nsivabalan let me know in case you need any further details?
Hi @nsivabalan , I analysed the Hudi code as well to check if there is any room for improvement but couldn't find much. Let me know if there is any updates from your end.
sorry, whats the shuffle parallelism you are setting for these writes? In your original description, I see you are setting it to 2. definitely that would give you bad perf. Try to give something like in the range of 100 to 1000 depending on your data size and see how it pans out. We have diff configs for diff operation. So, ensure you set the right config.
also, can you post your spark stages UI so that we can see some metrics wrt data skewness. and how much parallelism we are hitting.
I went over your latest messages. guess you interchanged upsert and bulk_insert commands while posting above. nvm.
Let me comment on each command.
I have tried to trim few configs. But lets try to keep some minimal so that once we get a good perf run, we can add back these configs and see which one is causing the spike in perf.
spark-submit --master yarn --deploy-mode client --num-executors 100 --driver-memory 12G --executor-memory 48G \ --conf spark.yarn.executor.memoryOverhead=8192 \ --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \ --conf spark.shuffle.io.numConnectionsPerPeer=3 \ --conf spark.shuffle.file.buffer=512k \ --conf spark.memory.fraction=0.7 \ --conf spark.memory.storageFraction=0.5 \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.connection.maximum=2000 \ --conf spark.hadoop.fs.s3a.fast.upload=true \ --conf spark.hadoop.fs.s3a.connection.establish.timeout=500 \ --conf spark.hadoop.fs.s3a.connection.timeout=5000 \ --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 \ --conf spark.hadoop.com.amazonaws.services.s3.enableV4=true \ --conf spark.hadoop.com.amazonaws.services.s3.enforceV4=true \ --conf spark.driver.cores=4 \ --conf spark.executor.cores=3 \ --conf spark.yarn.driver.memoryOverhead=8192 \ --conf spark.yarn.max.executor.failures=100 \ --conf spark.rdd.compress=true \ --conf spark.yarn.maxAppAttempts=3 \ --conf spark.network.timeout=800 \ --conf spark.shuffle.service.enabled=true \ --conf spark.task.maxFailures=4 \ --conf spark.driver.maxResultSize=2g \ --conf spark.hadoop.fs.s3.maxRetries=2 \ --conf spark.kryoserializer.buffer.max=1024m \ --conf spark.kryo.registrationRequired=false \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.shuffle.partitions=1536 \ --class <class-name> \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \ <jar-file-name>.jar
For eg: when I did bulk_insert benchmarking, I used the below w/ spark-shell
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.kryoserializer.buffer.max=1024m' --driver-memory 8g --executor-memory 9g --master yarn --deploy-mode client --num-executors 15 --executor-cores 8 --conf spark.rdd.compress=true --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true --conf spark.ui.proxyBase="" --conf "spark.memory.storageFraction=0.8" --conf "spark.driver.extraClassPath=-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70" --conf "spark.executor.extraClassPath=-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70" --conf 'spark.executor.memoryOverhead=2000m'
Nothing fancy, just set the appropriate memory, cores and some GC tuning configs and things worked for me.
bulk_insert configs. lets increase the index parallelism to 1000. let remove the storage level configs. I mean, lets try to get some baseline and then iteratively we can add back more configs. I see you are setting parquet max file size in your upsert command. probably we need to set them here too.
upsert configs. again lets set index parallelism to 1000 and remove storage level configs.
we have done lot of improvements around perf w/ hudi. https://hudi.apache.org/blog/2022/06/29/Apache-Hudi-vs-Delta-Lake-transparent-tpc-ds-lakehouse-performance-benchmarks
Can you try out 0.12. Closing it out due to long inactivity.
Tips before filing an issue
Have you gone through our FAQs?
Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
If you have triaged this as a bug, then file an issue directly.
Hi Team, I was testing Hudi for doing inserts/updates/deletes on data in S3. Below are benchmark metrics captured so far on varied data sizes:
Run 1 - Fresh Insert
Total Data size = 7 GB
COW = 22 mins MOR = 25 mins
Run 2 - Upsert
Total Data Size=6.7 GB
COW = 61 mins MOR = 64 mins
Run 3 - Upsert
Total Data size: 2.5 GB
COW = 39 mins MOR = 53 mins
Below are cluster configurations used: EMR Version : 5.33.0 Hudi: 0.7.0 Spark: 2.4.7 Scala: 2.11.12 Static cluster with 1 Master (m5.xlarge) , 4 (m5.2xlarge) core and 4 (m5.2xlarge) task nodes
To Reproduce
Steps to reproduce the behavior:
Expected behavior
Not expecting that Hudi will take so much time to write to Hudi Store. Expectation was it should take 15-20 mins time at max for data of size (7-8 GB) both inserts/upserts. Also for even writes CoW write strategy was performing better compared to MoR which I thought would have been vice versa.
Environment Description
Hudi version : 0.7.0
Spark version : 2.4.7
Hive version : 2.3.7
Hadoop version :
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : No
Additional context This is a complete batch job, we receive daily loads and upserts are supposed to be performed over existing Hudi Tables.
Static EMR cluster: 1 Master (m5.xlarge) node , 4 (m5.2xlarge) core nodes and 4 (m5.2xlarge) task nodes Spark submit command :: spark-submit --master yarn --num-executors 8 --driver-memory 4G --executor-memory 20G \ --conf spark.yarn.executor.memoryOverhead=4096 \ --conf spark.yarn.maxAppAttempts=3 \ --conf spark.executor.cores=5 \ --conf spark.segment.etl.numexecutors=8 \ --conf spark.network.timeout=800 \ --conf spark.shuffle.minNumPartitionsToHighlyCompress=32 \ --conf spark.segment.processor.partition.count=500 \ --conf spark.segment.processor.output-shard.count=60 \ --conf spark.segment.processor.binseg.partition.threshold.bytes=500000000000 \ --conf spark.driver.maxResultSize=0 \ --conf spark.hadoop.fs.s3.maxRetries=20 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.shuffle.partitions=500 \ --conf spark.kryo.registrationRequired=false \ --class \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
s3://
HUDI insert and upsert parameters: userSegDf.write .format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, if(hudiWriteStrg=="MOR") DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL else DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKey) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option("hoodie.upsert.shuffle.parallelism", "2") .mode(SaveMode.Overwrite) .save(s"$basePath/$tableName/")
userSegDf.write .format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, if(hudiWriteStrg=="MOR") DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL else DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionKey) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(s"$basePath/$tableName/")
I have tried to run a full production load on 53 GB of data size on production cluster with the below cluster configuration and spark submit command for Hudi insert using COW write strategy ...I observed that it is taking more than 2 hrs just for insert and it is quite evident from the earlier runs that I will take even more time for upsert operation.
Tota Data size: 53 GB Cluster Size:1 Master (m5.xlarge) node , 2 (r5a.24xlarge) core nodes and 6 (r5a.24xlarge) task nodes Spark submit command :: spark-submit --master yarn --num-executors 192 --driver-memory 4G --executor-memory 20G \ --conf spark.yarn.executor.memoryOverhead=4096 \ --conf spark.yarn.maxAppAttempts=3 \ --conf spark.executor.cores=4 \ --conf spark.segment.etl.numexecutors=192 \ --conf spark.network.timeout=800 \ --conf spark.shuffle.minNumPartitionsToHighlyCompress=32 \ --conf spark.segment.processor.partition.count=1536 \ --conf spark.segment.processor.output-shard.count=60 \ --conf spark.segment.processor.binseg.partition.threshold.bytes=500000000000 \ --conf spark.driver.maxResultSize=0 \ --conf spark.hadoop.fs.s3.maxRetries=20 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.shuffle.partitions=1536 \ --conf spark.kryo.registrationRequired=false \ --class \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
s3://
Hudi insert and Upsert parameters being same as above.