microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
423 stars 115 forks source link

Investigate use of Join Index Rule V2 when #buckets on indexes on both sides of join are different #237

Open apoorvedave1 opened 3 years ago

apoorvedave1 commented 3 years ago

Describe the issue

Problem1

Discussion thread: Thanks @imback82 for pointing this out: https://github.com/microsoft/hyperspace/pull/124#discussion_r514548371

  spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
  import spark.implicits._
  spark.sql("DROP TABLE if exists t1").show
  spark.sql("DROP TABLE if exists t2").show
  val df1 = (0 until 100).map(i => (i % 5, i % 13, i.toString)).toDF("i1", "j1", "k1")
  val df2 = (0 until 100).map(i => (i % 5, i % 13, i.toString)).toDF("i2", "j2", "k2")
  df1.write.format("parquet").bucketBy(4, "i1").saveAsTable("t1")                            // Bucket by i1
  df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")                    // Bucket by i2, j2
  val t1 = spark.table("t1")
  val t2 = spark.table("t2")
  val joined = t1.join(t2, t1("i1") === t2("i2") && t1("i1") === t2("j2"))                 // i1 = i2 and i1 = j2
  joined.explain(true)
  spark.sql("drop table t1")
  spark.sql("drop table t2")

Result:

== Optimized Logical Plan ==
Join Inner, ((i1#34 = i2#40) && (i1#34 = j2#41))
:- Filter isnotnull(i1#34)
:  +- Relation[i1#34,j1#35,k1#36] parquet
+- Filter (((i2#40 = j2#41) && isnotnull(i2#40)) && isnotnull(j2#41))
   +- Relation[i2#40,j2#41,k2#42] parquet

== Physical Plan ==
*(5) SortMergeJoin [i1#34, i1#34], [i2#40, j2#41], Inner
:- *(2) Sort [i1#34 ASC NULLS FIRST, i1#34 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#34, i1#34, 200)
:     +- *(1) Project [i1#34, j1#35, k1#36]
:        +- *(1) Filter isnotnull(i1#34)
:           +- *(1) FileScan parquet default.t1[i1#34,j1#35,k1#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/apdave/github/hyperspace-1/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int,k1:string>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [i2#40 ASC NULLS FIRST, j2#41 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i2#40, j2#41, 200)
      +- *(3) Project [i2#40, j2#41, k2#42]
         +- *(3) Filter (((i2#40 = j2#41) && isnotnull(i2#40)) && isnotnull(j2#41))
            +- *(3) FileScan parquet default.t2[i2#40,j2#41,k2#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/apdave/github/hyperspace-1/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(i2), IsNotNull(j2)], ReadSchema: struct<i2:int,j2:int,k2:string>, SelectedBucketsCount: 4 out of 4

Neither side is able to utilize bucketing.

Now if we set spark.sql.shuffle.partitions = 4, we can use bucketing on the right side

*(4) SortMergeJoin [i1#40, i1#40], [i2#46, j2#47], Inner
:- *(2) Sort [i1#40 ASC NULLS FIRST, i1#40 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#40, i1#40, 4)
:     +- *(1) Project [i1#40, j1#41, k1#42]
:        +- *(1) Filter isnotnull(i1#40)
:           +- *(1) FileScan parquet default.t1[i1#40,j1#41,k1#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/apdave/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int,k1:string>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#46 ASC NULLS FIRST, j2#47 ASC NULLS FIRST], false, 0
   +- *(3) Project [i2#46, j2#47, k2#48]
      +- *(3) Filter (((i2#46 = j2#47) && isnotnull(i2#46)) && isnotnull(j2#47))
         +- *(3) FileScan parquet default.t2[i2#46,j2#47,k2#48] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/apdave/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(i2), IsNotNull(j2)], ReadSchema: struct<i2:int,j2:int,k2:string>, SelectedBucketsCount: 4 out of 4

Problem

We need to decide whether or not to add the check spark.sql.shuffle.partitions == numBuckets while picking an index based on the following criteria:

Problem2

https://github.com/microsoft/hyperspace/pull/124#discussion_r515026784

Do we decide to NOT use index for Binary nodes where it doesn't make sense?

To Reproduce

Expected behavior

Environment

apoorvedave1 commented 3 years ago

Question by @imback82 https://github.com/microsoft/hyperspace/pull/124#discussion_r514830755