apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.95k stars 693 forks source link

Dynamic Index Build Side is Statically Chosen #854

Open dfischercodethoughts opened 1 year ago

dfischercodethoughts commented 1 year ago

Expected behavior

During dynamic join queries on commutative conditions, the local index is built on whichever side is smaller, and the other side is streamed against it. This likely requires some linear scans prior to running

Actual behavior

We must specify the side on which to build the index for all partitions apriori when using JoinQueries.SpatialJoinQueryFlat.

Steps to reproduce the problem

N/A

Settings

N/A

dfischercodethoughts commented 1 year ago

I can take a stab at fixing this one this weekend if you don't want to get around to it. Probably approach would be just specify an additional option in the query or something like that to do some additional processing to determine the index build side automatically. For the determining, maybe just iterate while the iterator.next is not null to count both sides?

jiayuasu commented 1 year ago

@dfischercodethoughts

This is a very good proposal and will significantly improve the user experience of our SQL API.

There are 3 important params involved in a spatial join:

  1. Spatial partitioning dominant side: sedona.join.spatitionside. Default: LEFT
  2. Spatial index build side: sedona.join.indexbuildside. Default: LEFT. See https://github.com/apache/sedona/blob/master/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java#L91
  3. Num of partitions of both RDDs used in a join. Default to use the num partition of sedona.join.spatitionside but will be optimized to reasonable partitions if the data is way less than num of partitions.

The best practice is the Spatial partitioning dominant side and Spatial index build side should always be the larger dataset (not the smaller dataset). To find out which one is larger, you can use the count of both RDDs. Note that: SpatialRDD.analyze() function already computes the count. You can leverage that to automatically determine the dominant side: https://github.com/apache/sedona/blob/master/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala#L59

You can add the automation in Sedona SQL API and leave sedona.join.spatitionside and sedona.join.indexbuildside as optional. In other words, our optimizer will automatically determine the two sides unless the user explicitly specifies the parameters.

Note that: in Sedona RDD API, we should still keep this manually.

I will leave the implementation to you. But if you feel this is too hard, please let me know.

@Kontinuation Kristin, please chime in if you have anything to add.

Kontinuation commented 1 year ago

I propose a different (more complicated) approach:

  1. Make analyze() method of SpatialRDD take sample of the RDD. We can integrate the logic of poisson sampler into the StatCalculator and calculate the boundary, count, and samples in one pass.
  2. When running the spatial join physical plan, we analyze() both sides. Now we know the boundary and count of both sides, then we can simply apply some heuristics to determine which is the partitioning side (for example, take the one with more records as the partitioning side).
  3. Build a spatial partitioning grid using the samples we collected in analyze(). Since we also have samples of the other side, we can estimate how many geometries of both sides will fall into each grid.
  4. When running the DynamicIndexLookupJudgement, we can determine which side to build and which side to stream on a per-grid basis.
jiayuasu commented 1 year ago

@Kontinuation I like this idea.

Let's break this proposal to 3 standalone PRs. I believe they can be implemented separately without relying on each other.

Step 1: Move the sampling logic to analyze()

  1. Update analyze() function of SpatialRDD to include the poisson sampler
  2. Build a spatial partitioning grid using the samples we collected in analyze().

Step 2: Add heuristics to determine the join side in TraitJoinQueryExec.scala (https://github.com/apache/sedona/blob/master/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala#L59)

Step 3: DynamicIndexLookupJudgement automatically determines the stream side on a per-grid basis.

@dfischercodethoughts What you are proposing is Step 2. You can take a stab if you want. Since the v1.4.1 will be released soon, I expect this entire proposal will be completed in v1.5.0