apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

Add option to replace SortMergeJoin with ShuffleHashJoin #1006

Closed andygrove closed 1 month ago

andygrove commented 1 month ago

What is the problem the feature request solves?

Other Spark accelerators, such as Spark RAPIDS and Apache Gluten, replace SortMergeJoin with ShuffleHashJoin for improved performance. We should evaluate this approach for Comet.

Spark RAPIDS

  val ENABLE_REPLACE_SORTMERGEJOIN = conf("spark.rapids.sql.replaceSortMergeJoin.enabled")
    .doc("Allow replacing sortMergeJoin with HashJoin")
    .booleanConf
    .createWithDefault(true)

Apache Gluten

  val COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED =
    buildConf("spark.gluten.sql.columnar.forceShuffledHashJoin")
      .internal()
      .booleanConf
      .createWithDefault(true)
/**
 * If force ShuffledHashJoin, convert [[SortMergeJoinExec]] to [[ShuffledHashJoinExec]]. There is no
 * need to select a smaller table as buildSide here, it will be reselected when offloading.
 */
object RewriteJoin extends RewriteSingleNode with JoinSelectionHelper {

Describe the potential solution

No response

Additional context

No response

viirya commented 1 month ago

It sounds reasonable. The vectorized implementation of SMJ looks inefficient in DataFusion. I'm not sure if there is any optimized algorithm for SMJ in vectorized execution. If not, using SHJ to replace SMJ will be good for performance.