locationtech / geotrellis

GeoTrellis is a geographic data processing engine for high performance applications.
http://geotrellis.io
Other
1.34k stars 360 forks source link

Spatial Partitioning #1206

Closed echeipesh closed 8 years ago

echeipesh commented 8 years ago

Spatial Partitioning

In GeoTrellis we are working with tiled rasters which are represented as RDD[(K,V)] where V is some representation of an image, perhaps Tile or MultiBandTile. In case of spatial raster K is SpatialKey(col, row) where col and row are column and row in some TileLayout. One can easily consider each tile to be a point in well gridded space, k ∈ N².

In spatial temporal case K is SpaceTimeKey(col, row, dateTime) and represents a tiled raster stack. However if we define some temporal resolution to bin dateTime we can consider each k as k ∈ N³.

Since we are applying operations that make use of the spatial relation between the keys can improve their performance by partitioning RDD[(K,V)] into non-overlapping blocks that cover the indexable space. Each block represents contiguous multi-dimensional region or sub-space.

Operations that will benefit from this region partitioning:

  1. Focal/Neighborhood Operations: These operations require access to spatially neighboring records. Increasing the likelihood that neighboring records are on the same partition, spatial locality, reduces the network load during focal operations.
  2. Region Selection: If partitions have a defined spatial region we can avoid inspecting any partition that does not intersect with query region when performing a spatial filter operation.
  3. Spatial Join: Again if all partitions conform to a global region grid we can avoid a shuffle during a join and match all possible overlapping keys by partition, using a OneToOneDependency between source partitions and target partitions.

    Partitioning Scheme

In practice by placing our data on a grid we have created some maximum indexable space. By defining block partitioning we have a coarse grid covering the original space. It's important to note that these blocks are non-intersecting and cover all of the indexable space.

There is a number of ways to implement a block partitioning scheme. The simplest is to perform modulo devision on vector k ∈ N³. A more interesting option is to use a coarse space filling curve, where each cell of the curve covers a region in the original space.

Knowing the individual record size we are able to select a near-optimum region size that will both limit the maximum partition size to provide good parallelism and assuming non-sparse data minimize skinny partitions.

SpacePartitioner

In order to comply with spark Partitioner interface we must produce partitions numbered from 0 to n-1. Furthermore these partitions should not be empty as Spark will examine every partition on task execution. This means that we can not simply provide every possible region as an actual partition.

However, if we know the bounding box of our data, bounds: KeyBounds[K] we can produce a sorted list of regions, Pr, that intersect bounds produces only those regions as partitions for our RDD[(K, V)].

Partitioner Contract

Implied in spark Partitioner contract is the promise that it can map any instance of K into one of the n partitions that it represents. This is the reason that it is so difficult to control partition size in spark. There is no information available at partitioner creation time on the number of records that one may see.

SpacePartitioner only intends to partition space within certain bounds. Knowing the bounds of the RDD before it is materialized allows us to create appropriate number of partitions for the amount of the data that we expect to see. Therefore larger RDDs will have more partitions and smaller RDDs will have fewer.

While you can not always know the bounds of your data before RDD materialization, it may be reasonble to expect to know them if the data was sourced using a query which limits selection by spatial bounds and your data is reasonable non-sparse. Once the bounds are assumed they can be propagated through filter and join operations and be available for subsequent calculations.

Region Selection

A region query Q can be expressed as KeyBounds[K]. At this point we compute which regions intersect the query bounding box and, set Qr ∈ (Set of all Regions) and only inspect partitions corresponding to Qr ∩ Pr. Note that we still need to check each record in resulting partitions for membership in Q as a refinement step.

SpatialJoin

A spark join/cogroup will require a shuffle, expressed by ShuffleDependency, unless two members of the cogroup share a partitioner, in which case shuffle will be avoided and OneToOneDependencys will be generated.

Although all keys in SpaceRDD are partitioned by the same global region grid, two partitions of index i in two different partitioners may not refer to this same region of space.

Before we can ask spark to cogroup two SpaceRDDs we must construct an appropriate target partitioner, containing all partitions required to hold the result of the calculation, and re-order both of the inputs to match the partition structure of the target partitioner. We implement ReorderedSpaceRDD to perform this task.

Once the source RDD partition re-ordering has happened we are able to use spark CoGroupedRDD to perform the actual record join.

Outer Join

Outer join between rdd1: RDD[(K,V)] and rdd2: RDD[(K,V)]

Left join between rdd1: RDD[(K,V)] and rdd2: RDD[(K,V)]

Inner join between rdd1: RDD[(K,V)] and rdd2: RDD[(K,V)]

Creation of SpaceRDD requires a filter and a shuffle in order to ensure that all the records in the RDD are within KeyBounds[K] described in the partitioner and assigned to their respective partitions.

Joins on mixed resolution data

So far we have assumed that our data maps to some dense multi-dimensional grid. This is certainly the case with 2D imagery; few maps have regular holes in them. However when dealing with spatiotemporal imagery it is easy to imagine to want to join an annual and monthly datasets.

If we pick the highest, monthly, resolution for our partitioning then annual partitions will never reach full index density. All annual partitions will be 1/12th the size of the monthly partitions. This will probably cause drift into sub-optimal partition sizing.

How to combine mixed resolution datasets is an open question. The solution probably involves maintaining enough information in SpacePartitioner[K] to convert each region to their spatial bounding box and performing more intricate partition overlap logic. Fortunately NarrowDependency allows each partition to depend on a List of other partitions (for the case where we are joining high resolution RDD into low resolution RDD). It is currently not clear what is the practical limit for the number of partitions to depend on in NarrowDependency. It seems reasonable to say that we should always adopt partition structure from higher resolution RDD.

echeipesh commented 8 years ago

I have some some benchmarking on the prototype implementation and can comment on two findings:

Left Join

Performing spatial partitioning before left join on the two RDDs has same performance as performing left join with HashPartitioner. This is somewhat surprising as the initial spatial partitioning requires a filter and a shuffle operation for each RDD.

Focal Operations

Focal operations on spatially partitioned RDDs are 30-40% faster than on hash partitioned RDDs. This part is not surprising.

lossyrob commented 8 years ago

Fixed by #1230