geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
503 stars 44 forks source link

ENH: repartition based on an attribute #61

Open jorisvandenbossche opened 3 years ago

jorisvandenbossche commented 3 years ago

Assuming you have an attribute in the GeoDataFrame that describes some form of spatial extent or entity (eg the country name, the zip code, district, ..., or a calculation such as a geohash), this column could also be used to repartition the dataframe spatially.

Dask already somewhat supports this with the set_index and repartition methods. But in practice they don't necessarily do exactly what we want, and a helper function that uses those methods on the hood will probably be useful.

In my sjoin demo notebook, I repartitioned a GeoDataFrame based on a geohash column, i.e. a column with unique values and I wanted a new partition for each value in this column. In ended up doing something like:

df_points_sorted = df_points.set_index("geohash1").sort_index()
n_partitions = df_points.index.nunique()
ddf_points_sorted = dask_geopandas.from_geopandas(df_points_sorted, npartitions=n_partitions)
ddf_points_sorted = ddf_points_sorted.repartition(
    divisions=('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'b', 'c', 'd', 'e', 'f', 'g', 'g')).persist()

So dask sorts the input on the index and determines partition divisions on that in from_(geo)pandas (or if you already start with a dask dataframe, set_index will do the same). However, that tries to create more or less equal size partitions, so to ensure to actually have one partition per unique value of the attribute, I repartitioned the result again with the partition bounds explicitly given to repartition(..) (this are the unique values in the "geohash1" column). This second step is relatively cheap since it's only changing some start/stop slices, and not actually shuffling data.

I suppose we could write a custom version (maybe based on how those methods in dask are implemented) that makes this a bit simpler (e.g. we also don't need to have the partitions sorted). I think the main question is: do we need two passes over the data to first determine all unique values in the column, and then the actual repartitioning in a second step?

jorisvandenbossche commented 3 years ago

In dask there is also a DataFrame.shuffle method, probably something to look at: https://github.com/dask/dask/blob/8aea537d925b794a94f828d35211a5da05ad9dce/dask/dataframe/shuffle.py#L301 (https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.shuffle). That seems more similar to what we want to do here compared to set_index/repartition.