apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

hoodie.bulkinsert.shuffle.parallelism Not activated #10418

Closed zhangjw123321 closed 8 months ago

zhangjw123321 commented 9 months ago

Describe the problem you faced

1.source table (ods.ods_company) is 1w files, 2.set hoodie.bulkinsert.shuffle.parallelism=100 Not activated, 3.insert into hudi table after ,hudi table is 1w files, set hoodie.bulkinsert.shuffle.parallelism Not activated, The correct number is 100 files,not 1w files。

To Reproduce

Steps to reproduce the behavior:

1./opt/software/spark-3.2.1/bin/spark-sql \ --master yarn --conf spark.ui.port=4049 \ --conf spark.ui.showConsoleProgress=true \ --conf spark.hadoop.hive.cli.print.header=true \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --queue root.hdfs \ --driver-memory 5g \ --executor-memory 20g \ --executor-cores 10 \ --num-executors 20 2.CREATE TABLE IF NOT EXISTS hudi_ods.ods_company( id bigint,


)using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'dt' ) 3. set hoodie.spark.sql.insert.into.operation=bulk_insert; set hoodie.bulkinsert.shuffle.parallelism=100; 4. insert into table hudi_ods.ods_company select * from ods.ods_company where dt='2023-12-15'; Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

ad1happy2go commented 9 months ago

@zhangjw123321 The number of input partition/spark tasks is derived from the input dataset. How many tasks are getting created and what is the nature of source of dataset?

zhangjw123321 commented 9 months ago

This dataset is imported by mysql into the hive table,Use sqoop-m 10000。

zhangjw123321 commented 9 months ago

First of all, thank you very much for your reply. Advanced configuration is introduced here, after I configure this parameter is not effective, I think the normal configuration parameters, hudi table file number is and (hoodie. bulkinsert. shuffle. parallelism) parameters of concurrent number consistent, the reason of this parameter is not effective, please what? image

ad1happy2go commented 9 months ago

@zhangjw123321 This is for number of partitions after shuffle stage(Mainly used when we use any sort mode or custom partitioning as mentioned in doc). Can you show the spark DAG once. How many stages it is creating?

zhangjw123321 commented 9 months ago

image image

zhangjw123321 commented 9 months ago

image

ad1happy2go commented 9 months ago

@zhangjw123321 Its going in deduping records. For bulk insert it doesn't dedup with the default configs. Are you setting any other configs?

zhangjw123321 commented 9 months ago

Which stage is deleting duplicate records,Other than the above configuration, no other configuration is manually set。 Download the https://dlcdn.apache.org/hudi/0.14.0/hudi-0.14.0.src.tgz, maven editing hudi - spark3.2 - bundle_2. 12-0.14.0. Jar

KnightChess commented 9 months ago

@zhangjw123321 look like hoodie.bulkinsert.shuffle.parallelism can not work on non-partitioned table in the code. In the spark ui, may be you not set spark.default.parallelism so reduceBykey will use the parent rdd partitions size. Can you try set spark.default.parallelism=100; I think it will reduce the parallelism in stage 10 to 100.

gtzysh commented 9 months ago

I think hoodie.bulkinsert.sort.mode is helpful

zhangjw123321 commented 9 months ago

image set hoodie.spark.sql.insert.into.operation=bulk_insert; set hoodie.bulkinsert.shuffle.parallelism=100; set spark.default.parallelism=100; set spark.sql.shuffle.partitions=100; After these parameters are used, the hdfs hudi file is still 10000

@zhangjw123321 look like hoodie.bulkinsert.shuffle.parallelism can not work on non-partitioned table in the code. In the spark ui, may be you not set spark.default.parallelism so reduceBykey will use the parent rdd partitions size. Can you try set spark.default.parallelism=100; I think it will reduce the parallelism in stage 10 to 100.

KnightChess commented 9 months ago

@zhangjw123321 I test in my local, spark.default.parallelism look like can not effect in sql set, can you set when submit spark job, like --conf. before try it, how much cores executors have total when this job execute? 10000 cores? or ods.ods_company where dt='2023-12-15' have 10000 files?

zhangjw123321 commented 9 months ago

@zhangjw123321 I test in my local, spark.default.parallelism look like can not effect in sql set, can you set when submit spark job, like --conf. before try it, how much cores executors have total when this job execute? 10000 cores? or ods.ods_company where dt='2023-12-15' have 10000 files? 1.Use --executor-cores 10 --num-executors 20 or --executor-cores 10 --num-executors 10 2.Yes,10000 FIles。

KnightChess commented 9 months ago

@zhangjw123321 you can try set it in spark submit, --conf, or by code sparkconf.set('xxx','yyy'), will match other branch, not use parent rdd partition size image

zhangjw123321 commented 9 months ago

I try to set the number of files that can be generated normally. Thank you very much. @KnightChess @ad1happy2go

KnightChess commented 9 months ago

@zhangjw123321 I create a issue to track it, https://issues.apache.org/jira/browse/HUDI-7277

KnightChess commented 8 months ago

10532 fix hoodie.bulkinsert.shuffle.parallelism can not work when dedup source.