opengeospatial / geoparquet

Specification for storing geospatial vector data (point, line, polygon) in Parquet
https://geoparquet.org
Apache License 2.0
831 stars 57 forks source link

Add an spatial index in geoparquet format as optional #13

Open alasarr opened 2 years ago

alasarr commented 2 years ago

Some geospatial formats have started to include a spatial index flatgeobuf. I think would be great to include it as optional for geo parquet.

I consider it quite interesting because it provides a way to get spatial features (based on bbox for example) from a parquet file without requiring to scan the whole file. It unblocks lots of use cases:

I'm not an expert in parquet, and it's not clear to me how to implement a geospatial index there. It looks like parquet includes something for indexes at metadata.

image

At the thrift definition it looks the IndexPageHeader is under a TODO comment.

Just opening this to start the discussion on how to properly implement this.

bo-lu commented 2 years ago

A particularly good use case is with OGC API - Records, which contains a bounding box but most of the payload is columnar metadata. Having an index on just the bounding box portion would enable super fast queries, like COG for features.

jorisvandenbossche commented 2 years ago

For the discussion, it might be useful to distinguish between different "levels" of spatial indices: a spatial index for a single file (like flatgeobuf has) vs spatial indexing based on a multi-file partitioning.

For a partitioned dataset, having a bbox per file (as being added in https://github.com/opengeospatial/geoparquet/pull/21), can already serve as a rudimentary spatial index. That is also how we are using this in dask-geopandas to selectively load only those partitions that are needed for the operation or visualization. But for this, we probably want to look for a better solution long-term (that doesn't rely on extracting a piece of metadata out of each individual file), with some additional manifest or spatial index file that gathers this information. And it might be interesting to see if this could integrate with existing data lake solutions (like Iceberg of Delta Lake). It might be best to open a separate issue for this topic.

When focusing on the single file level here, the Parquet format has some different "indexing" related features:

(sidenote: the C++ Parquet implementation (which lives in the Arrow project) currently only supports the ColumnChunk statistics for filtering data based on this with predicate/filter pushdown)

So both indexing related features (aside from the Bloom filter) that are built-in into the Parquet format are based on min/max values. This makes them hard to use for spatial filtering, I think.

If we would want to do something like including a packed rtree index in the file itself, as flatgeobuf does, the only option is putting this in the file metadata (apart from extending the Parquet format). Which in addition means that it would need to get eg base64 encoded to store it as a valid string. This also doesn't feel as a very appealing option.

(personally, I think focusing on a good story for spatial indexing partitioned datasets (up to the file level) might be the most valuable)

pomadchin commented 2 years ago

Hm, so if i read it correct, than having huge parquet files with a lot of geometries and with ability to quickly read a correct subset of rows out of it is never a use case, and as workaround the suggestion is to have some mechanism to split such large parquet failes into spatially collocated groups stored in separate files, which will always be of a size that is 'good enough' (depending on the use case) to fit performance requirements?

@jorisvandenbossche

echeipesh commented 2 years ago

Sorry for the word dumb, but this is the setup for our thinking and experiments that @pomadchin has been running. It seems to do something good.

TLDR is the its certainly possible to make querying Parquet file a lot faster by encoding MBR into records and filtering on those first and it kind of maps neatly to what R-Tree does.

Cribbing this image from Wikipedia we can logically map:

Red box = Page Blue Box = ColumnChunk Black box = Parquet File

 wiki Rtree

(I'm pretty sure that Parquet reader is smart enough to apply filtering per Page where they can be multiple pages for each ColumnChunk but I'm not 100% on this.)

...

Spatial indexing is the ability to minimize the amount of data scanned and read based on a spatial query. While the user actual query may be some complex geometry it's typical for such systems to perform the primary filtering by reducing the query to a Minimum Bounding Rectangle (MBR) for the data scanning stage and subsequently perform the refinement check for the actual intersection at a later time. The base case is that every file and every record is read, evaluated in the refinement step.

All this implies that in order to be actually spatially indexed that records that are spatially co-located should be co-located with-in the Parquet file structure. This allows for maximum number of records to be ignored and minimum number of records to be read for a given spatial query. I guess we’re not talking about how that happens here, that’s some spatial-sort process.

Encoding Strategy

Broadly speaking there are two approaches to implementing a spatial index:

  1. Encoding it using an R-Tree. Here chunks of spatial data that are spatially closed are grouped into rectangular regions which themselves can be grouped forming a tree. These regions may overlap.

  2. Encoding it by intersection over a discrete grid. This can be a grid like H3 or rectilinear grid, usually mapping each grid cell to the index of a space filling curve. The cells never overlap and fully cover the surface.

I assert that only R-Tree encoding is applicable for GeoParquet. This is because the primary purpose of GeoParquet is to be a common cloud friendly vector distribution format. So it should be readable by clients of various level of sophistication.

When indexing lines or polygons by intersection over a discrete grid implies that a single record could intersect multiple cells and thus map to multiple indices. Because a query could hit only one of the candidate cells this requires storing geometry with every cell that it intersects. Systems that use this approach must deal with possible data duplication when evaluating any query.

Because bloom filters are basically set membership queries and I’ve argued that set-membership via intersection to the grid is not an acceptable approach here, I don’t really think that they help us. I’d love to be wrong though.

If we require no record duplication a primitive reader could ignore the spatial indexing feature and read every record, using them as needed without any additional concerns. This is highly desirable for GeoParquet format and leaves us with the question of how to implement an R-Tee in the Parquet format.

Parquet Structure

The basic idea is to represent geometries as Minimum Bound Rectangle(MBR) and filter on those values at various levels of aggregation, thus creating an R-Tree. This is not a novel idea and is explored in:

  1. Yu, J., Zhang, Z. & Sarwat, M. Spatial data management in apache spark: the GeoSpark perspective and beyond. Geoinformatica 23, 37–78 (2019)

  2. Eldawy, Ahmed et al. “Sphinx: Empowering Impala for Efficient Execution of SQL Queries on Big Spatial Data.” SSTD (2017).

Parquet structure offers several levels at which we can perform filtering:

Row

Each row corresponds to a geometry and its attributes. We need a basic way to filter each row by intersection of a query BBOX. Because right now we’re talking about storing geometries in formats like WKB which are not understood by native parquet reader predicates we need something else.

One way we can get around this problem is to require that the writer of a GeoParquet file with support for spatial indexing adds an MBR column that contains (xmin, xmax) (ymin, ymax) of the indexed geometry. This will allow us to actually use existing push-down filters for the first pass at intersections.

ColumnChunk Stats

Multiple rows are aggregated into a Row Groups where column values are grouped by Column Chunks which in turn can consist of multiple pages. ColumnChunk maps to the leaf region of the R-Tree. The parquet format is able to store min/max stats of primitive valued columns stats per Page and ColumnChunk. These stats on the MBR columns give us the MBR of the leaf region. This again allows the existing parquet readers to skip reading chunks where the filter predicate can't be true based on the min/max values recorded.

File Footer

Larger datasets will typically span multiple files, each typically 32MB to 64MB in size, this is configurable on write. Thus a whole file can be skipped if none of the the records intersect query BBOX. Each file will store over all min/max values for the MBR. The parquet reader can potentially discard the whole file after inspecting the footer.

We can also add the BBOX to the file metadata but it is not actually needed if we’re encoding MBR as a column. Room for debate here is more clear. Either way we would need a specialized mechanism to make use of the metadata information.

File Index

Opening a filter in order to inspect the footer can still be avoided if there is a secondary index of file MBRs which can be consulted. This can potentially be stored as a side-car file, maybe GeoJSON FeatureCollection for harmony with STAC efforts or perhaps some other live index like PostGIS. If GeoParquet gets loaded into Hive or DataBricks Delta Lake then actually this type of index gets built up “for free”. I kind of have a suspicion that actually for systems handling large datasets something like a secondary index is already going to be in place as a rule. But having a static index file that allows any reader to locate the right parquet files in a dataset only from blob storage is clearly the more cloud friendly way, so it’s probably very much worth it.

alasarr commented 2 years ago

Great explanation @echeipesh! Really excited about having at least something that works!

Just to share with the others, @pomadchin @echeipesh and I have been working during the last few weeks on how to speed up tiles intersections with CARTO analytics toolbox for Databricks. We've been able to reduce intersections from 10 minutes to 2 seconds using this approach of storing the MBR at a separated column and clustering the data spatially. More info here

All this implies that in order to be actually spatially indexed that records that are spatially co-located should be co-located with-in the Parquet file structure.

Some warehouses such as BigQuery implement it that way, if you cluster a table in BigQuery by geom and you use a spatial filter, you only read that region of the world. If you don't you full scan the whole table. I think snowflake is also working on something similar.

One way we can get around this problem is to require that the writer of a GeoParquet file with support for spatial indexing adds an MBR column

It's not the perfect solution for me, but that's true it's the best we have so far. My biggest concern is that users will have an extra column that could have several issues (for example name collision with other user columns).

The only solution I can find right now is to modify the encoding of the geometry to add the Minimum Bounding Rectangle (MBR), not very exciting 😄 . @jorisvandenbossche any suggestion at this point?

kylebarron commented 2 years ago

One way we can get around this problem is to require that the writer of a GeoParquet file with support for spatial indexing adds an MBR column that contains (xmin, xmax) (ymin, ymax) of the indexed geometry. This will allow us to actually use existing push-down filters for the first pass at intersections.

I really like this idea because it leverages what Parquet supports out of the box and limits the custom implementation we need. I'm curious how much extra space this would add to a file on average. If spatially correlated data is co-located, then maybe this column would compress very well 🤷‍♂️ .

(I'm pretty sure that Parquet reader is smart enough to apply filtering per Page where they can be multiple pages for each ColumnChunk but I'm not 100% on this.)

It looks like the format allows this (see here and here plus this doc) with the caveat that page statistics are stored in the page header, which is not (I think) in the file footer. Whereas column chunk statistics are part of ColumnMetaData which is part of ColumnChunk which is part of the RowGroup metadata in the file footer.

The implication here is column chunks/row groups can be skipped by reading the file metadata, but to skip pages you need to additionally load the page metadata. This is probably a good thing so that the file footer doesn't become too large.

The parquet reader can potentially discard the whole file after inspecting the footer.

Parquet datasets that are partitioned into multiple files often already have a top-level metadata sidecar file. This doesn't look to be enforced by the spec, but seems to be just a popular implementation detail?

So in effect we could have a cascade:

My biggest concern is that users will have an extra column that could have several issues (for example name collision with other user columns).

I don't see name collision as an issue because the writer is able to see all the names in the dataset when writing. So if its first choice of name "_mbr1" is already used in the dataset, then it can go on to "_mbr2" and so on until it finds a non-conflicting column name. And then it can store in the geo metadata the name of the column that contains the MBR data.

paleolimbot commented 2 years ago

This is really exciting! I'm just catching up on this discussion, so forgive me if I'm missing bits. Just a few thoughts:

I really like the idea of struct column encoding each feature's envelope (the term 'envelope' is from the GeoPackage standard). This concept is analogous to how GeoPackage and flatgeobuf do this and if file size is a concern they could always be encoded as float or omitted. Maybe its name could be specified in the geoparquet metadata field? I imagine that writers would give it a sensible default name like {column}_bounds.

For "edges": "spherical" it probably makes sense to have sphere-friendly bounds option like an s2 covering. To at least allow this to be the case, I suppose a 'bounds_encoding' column would be needed, and I can see how that's opening up a can of worms (arguably, though, so is pretending that the concept of a minimum bounding rectangle is useful for lat/lon coordinates).

I like the idea of using the unit of the "RowGroup" as the basis for an index, which could be at the file level or above the file level. This puts the responsibility on the writer to build the index in advance and gives some flexibility over the granularity of the preselection. It also forces a tradeoff...should the file be optimized for fast access to possibly intersecting features (smaller row groups), or should it be optimized for reading the whole thing quickly (bigger row groups)? The RowGroup is a nice unit because it fits in well with the existing Arrow Dataset infrastructure (although I'm not an expert in the 'Fragment' concept, which I think is where this fits in).

pomadchin commented 2 years ago

@echeipesh we need to convert this issue into the best practises doc that explains how we can reshape data and form the proper geoparquet to make use of parquet filtering on reads!

Explicitly describe the desired parquet structure and explanations why it works, it also highlights the value of https://github.com/geopandas/geo-arrow-spec/issues/4

This is the UPD after the zoom meeting, to come up with the best practises doc that captures recommendations and approaches to effectively read geometries out of parquet files.

kylebarron commented 2 years ago

I had to drop off the call early yesterday so I missed any discussion about spatial indexing. I think it would be helpful for the spec to describe (either optional or as an extension) the metadata format for spatial indexing so that readers and writers have a contract for how to store this metadata.

But I would agree that recommendations for how to actually partition the data would be best suited to a best practices doc, so that the method of partitioning is flexible for different use cases.

Explicitly describe the desired parquet structure and explanations why it works, it also highlights the value of geopandas/geo-arrow-spec#4

Can you explain your thoughts behind this a little more? I created an example parquet file with geoarrow geometry encoding via gdal (docker run -v $(pwd):/data -it osgeo/gdal:latest ogr2ogr /data/rivers.parquet /data/rivers.geojson -lco GEOMETRY_ENCODING=GEOARROW) and it doesn't seem to have the statistics metadata we want out of the box.

The column metadata for the geometry column for a row group:

{'file_offset': 22214692,
 'file_path': '',
 'physical_type': 'DOUBLE',
 'num_values': 2405304,
 'path_in_schema': 'geometry.list.item.list.xy',
 'is_stats_set': True,
 'statistics': {'has_min_max': True,
  'min': -108.985001,
  'max': 40.996119,
  'null_count': 0,
  'distinct_count': 0,
  'num_values': 2405304,
  'physical_type': 'DOUBLE'},
 'compression': 'SNAPPY',
 'encodings': ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN'),
 'has_dictionary_page': True,
 'dictionary_page_offset': 2604657,
 'data_page_offset': 3654378,
 'total_compressed_size': 19610035,
 'total_uncompressed_size': 20081644}

You can see that the statistics object for native arrow geometries stores min and max but this is the min of both x and y and the max of both x and y, so it's not the full bbox.

I would guess that a struct encoding of arrow-native geometries actually would work out of the box, guessing that the native parquet storage of that would be geometry.struct.x and geometry.struct.y as two different parquet column paths which would each get their own statistics.

kylebarron commented 2 years ago

Just a quick note that my hunch was correct. If you create a dummy dataset with a struct column from Arrow, it does get saved as two columns in Parquet, each with their own statistics.

Creating a test dataset:

import pyarrow as pa
import numpy as np
import pyarrow.parquet as pq

x = pa.array(np.array([1, 2, 3, 4], dtype=np.float64))
y = pa.array(np.array([5, 6, 7, 8], dtype=np.float64))
points = pa.StructArray.from_arrays([x, y], names=['x', 'y'])
table = pa.Table.from_arrays([points], names=['geometry'])
pq.write_table(table, 'test.parquet')

Then checking its metadata:

import pyarrow.parquet as pq
meta = pq.read_metadata('test.parquet')
rg = meta.row_group(0)
rg.to_dict()

You can see that this one StructArray is stored in Parquet as two logical columns, and therefore the bounding box of the row group is implicitly stored automatically. This is definitely something for us to keep in mind as we discuss an arrow-native geometry encoding.

{'num_columns': 2,
 'num_rows': 4,
 'total_byte_size': 234,
 'columns': [{'file_offset': 121,
   'file_path': '',
   'physical_type': 'DOUBLE',
   'num_values': 4,
   'path_in_schema': 'geometry.x',
   'is_stats_set': True,
   'statistics': {'has_min_max': True,
    'min': 1.0,
    'max': 4.0,
    'null_count': 0,
    'distinct_count': 0,
    'num_values': 4,
    'physical_type': 'DOUBLE'},
   'compression': 'SNAPPY',
   'encodings': ('PLAIN_DICTIONARY', 'PLAIN', 'RLE'),
   'has_dictionary_page': True,
   'dictionary_page_offset': 4,
   'data_page_offset': 48,
   'total_compressed_size': 117,
   'total_uncompressed_size': 117},
  {'file_offset': 334,
   'file_path': '',
   'physical_type': 'DOUBLE',
   'num_values': 4,
   'path_in_schema': 'geometry.y',
   'is_stats_set': True,
   'statistics': {'has_min_max': True,
    'min': 5.0,
    'max': 8.0,
    'null_count': 0,
    'distinct_count': 0,
    'num_values': 4,
    'physical_type': 'DOUBLE'},
   'compression': 'SNAPPY',
   'encodings': ('PLAIN_DICTIONARY', 'PLAIN', 'RLE'),
   'has_dictionary_page': True,
   'dictionary_page_offset': 221,
   'data_page_offset': 261,
   'total_compressed_size': 113,
   'total_uncompressed_size': 117}]}
paleolimbot commented 2 years ago

@jorisvandenbossche and I were chatting about this today...I recently nixed my prototype implementation of the struct encoding, but the example files that I wrote before I nixed the implementation might be useful for experimenting: https://github.com/paleolimbot/geoarrow/tree/6b3870ba8305994590cf169a36271a6f8f204b1a/inst/example_parquet

It seems like the geoarrow struct encoding might be a nice way to avoid fighting the parquet format...I nixed the implementation because supporting more than one point added some complexity and nobody seemed to want it. I would love to see a way to include a tree-based format that isn't me inventing something...the FlatGeobuf index code is fairly small and easily portable...I wonder if that could be useful here somehow.

kylebarron commented 2 years ago

the FlatGeobuf index code is fairly small and easily portable...I wonder if that could be useful here somehow.

The FlatGeobuf index is a packed hilbert r tree, which it says was inspired by https://github.com/mourner/flatbush. I played around with dask-geopandas' hilbert curve implementation a bit, but had a little difficulty getting a "good" repartitioning of a test dataset with that. Generating hilbert values seems like the easy part; actually cutting the boundaries between partitions so that there's minimal overlap between partitions while keeping partitions roughly equal in size seems like a much harder part.

I ported flatbush to optimized Cython recently, in the hope that it might make choosing the partitions easier. Testing with the Microsoft US Buildings dataset (130 million polygons) I get a decent partitioning structure, but could probably be improved (this is with 521 equal-sized partitions). (My hope is that the tree structure of flatbush should help with partitioning both at the file-level and the row-group-level together).

image

A caveat about the above is that it mostly falls under best practices and not under specification. If the Parquet metadata allows for an R-tree, then it could be created by any method, not just via hilbert curves. So essentially we could take the FlatGeobuf packed-hilbert r-tree as inspiration without declaring it the only indexing method.

d3netxer commented 1 year ago

Any update on this issue? Does parquet metadata allow for a R-tree?

paleolimbot commented 1 year ago

I did some experiments using GeoArrow encoding (i.e., not WKB) and found that if you are willing to write small row groups that are vaguely clustered (I used a Hilbert sort) you can use column stats to reduce your read time (although not to the extent that a row-level spatial index would allow). I don't have a well-organized example yet but a poorly organized example can be found here: https://gist.github.com/paleolimbot/e42be0733925074ccd95849b0b24fa32 .

Does parquet metadata allow for a R-tree?

You can put whatever key/value metadata you want at the file level (e.g., where the current "geo" key lives); however, most tools (e.g., pyarrow dataset) propagate file-level metadata assuming that it is independent of the data. I believe there is also row group metadata (which is not currently accessible in pyarrow to my knowledge). I don't know that general key/value metadata is a good choice for an R-tree because some tools assume that metadata IO is cheap and pulling an RTree from S3 by accident is likely to be expensive.

I don't personally feel that hijacking file or row group metadata for index storage is a good idea...it may take an update to the Parquet format itself to allow this (or something more generic like large key/value storage).

jiayuasu commented 1 year ago

R-Tree for clustering

If we just talk about how to cluster / reorg data based on the spatial proximity, any space filling curve or H3/S2/GeoHash will do the same thing although it may not preserve as much as spatial information as R-Tree clustering. The general process is that (1) create a space filling curve or H3/S2/GeoHash id for each geom (2) sort / partition geom by this id.

If we want to R-Tree clustering, the general process will be (1) create an R-Tree based on a small sample of the raw data (building an R-Tree on the entire dataset is a bad idea, see [1]) (2) let each obj in the dataset traverse the tree in a top-down fashion to know its exact leaf node ID. (3) sort/partition geom by this id.

R-Tree for the metadata

I don't think it is a good idea of putting R-Tree in the column metadata for enabling row-group level filtering. This is because R-Tree will introduce additional overhead for storage and maintenance (see [1]) considering geoparquet might be used together with table formats such as Iceberg, Hudi and DeltaLake for supporting data updates. We @wherobots already have a GeoParquet implementation for Iceberg called Havasu (https://docs.wherobots.services/latest/references/havasu/introduction/)

[1] Indexing the Pickup and Drop-off Locations of NYC Taxi Trips in PostgreSQL – Lessons from the Road: https://jiayuasu.github.io/files/paper/hippo_sstd_2017.pdf

image

image

jiayuasu commented 1 year ago

We also conducted a similar benchmark to study the pruning power of S2/GeoHash (e..g, overlapping area around partitions), You can see it here: https://wherobots.com/spatial-data-geoparquet-and-apache-sedona/

kylebarron commented 8 months ago

https://github.com/opengeospatial/geoparquet/pull/191 included a preliminary implementation of spatial partitioning in GeoParquet, to be released in GeoParquet 1.1.

four43 commented 1 week ago

GeoParquet 1.1 is labeled as stable - can this be closed? This is exciting!