Closed thomasg19930417 closed 1 year ago
I think it should be caused by too many connections when the partition is too large. I also found a lot of connection creation process from the log as follows
If the shuffle.partitions can be reduced, this problem can be solved, but in complex tasks, both large and small tasks exist. If the setting is too small, it is not very suitable for large tasks
Does increasing celeborn.network.timeout
help? The default value is 240s.
https://celeborn.apache.org/docs/0.2.1-incubating/configuration/#network
Note: you should set it in Spark configuration with the additional prefix spark.
, then it should be spark.celeborn.network.timeout
Is it caused by not disabling spark.sql.adaptive.localShuffleReader.enabled
?
Is it caused by not disabling
spark.sql.adaptive.localShuffleReader.enabled
?
@wForget Right, @thomasg19930417 could you turn off spark.sql.adaptive.localShuffleReader.enabled
and test again?
After testing, it is indeed caused by this parameter, thank you for your reply @pan3793 @wForget @waitinfuture
After testing, it is indeed caused by this parameter, thank you for your reply @pan3793 @wForget @waitinfuture
We do added comment about this config, maybe we should highlight it more :)
After testing, it is indeed caused by this parameter, thank you for your reply @pan3793 @wForget @waitinfuture
We do added comment about this config, maybe we should highlight it more :)
This parameter was commented out by my mistake, maybe it was automatically commented out when the shell was copied.
Env:
spark 3.2.2 celeborn 0.2.1
Question desc:
When spark3 spark.sql.adaptive.autoBroadcastJoinThreshold is turned on, it is found that when automatic Broadcastjoin is triggered, if spark.sql.shuffle.partitions is set relatively large at this time, the broadcast time will be longer
Spark config:
Dynamic resource
spark.dynamicAllocation.enabled true spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.maxExecutors 500 spark.dynamicAllocation.initialExecutors 2 spark.dynamicAllocation.executorIdleTimeout 120
Adaptive
spark.shuffle.minNumPartitionsToHighlyCompress 5000 spark.sql.adaptive.enabled true spark.sql.adaptive.coalescePartitions.enabled true spark.sql.adaptive.skewedJoin.enabled true spark.sql.adaptive.advisoryPartitionSizeInBytes 64M spark.sql.adaptive.coalescePartitions.minPartitionSize 10M spark.sql.adaptive.skewJoin.skewedPartitionFactor 2 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 128M spark.sql.optimizer.dynamicPartitionPruning.enabled false
celeborn
spark.shuffle.service.enabled false celeborn.shuffle.rangeReadFilter.enabled true celeborn.shuffle.partitionSplit.mode hard spark.shuffle.manager org.apache.spark.shuffle.celeborn.RssShuffleManager spark.celeborn.master.endpoints xxxxxx spark.rss.push.data.replicate false spark.rss.shuffle.writer.mode hash
kyuubi
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled true spark.sql.optimizer.finalStageConfigIsolation.enabled true spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes 256M spark.sql.finalStage.adaptive.coalescePartitions.minPartitionSize 50M spark.sql.extensions org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension
reproduce sql
set spark.sql.adaptive.autoBroadcastJoinThreshold=10M; set spark.sql.autoBroadcastJoinThreshold=-1; set spark.sql.shuffle.partitions=5000;
select a.id from ( select id from range(1,10000) ) a inner join (select id from range(1,100) ) b on a.id=b.id
I found that this will be affected by the spark.sql.shuffle.partitions parameter. When the parameter is relatively large, the broadcast process will take longer
Test case:
when spark.sql.shuffle.partitions=10;
when set spark.sql.shuffle.partitions=1000;
when set 5000
I didn't find this problem when I used the native spark3 built-in shuffle