geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
490 stars 44 forks source link

Dask GeoPandas #52

Open mrocklin opened 7 years ago

mrocklin commented 7 years ago

What would it look like to parallelize/distributed GeoPandas with Dask?

Dask has created parallel variants of NumPy arrays and Pandas Dataframes. I think it may be sensible to do the same with Geopandas. I would like to solicit thoughts from Geopandas developers and people familiar with Dask on this idea. Is there a significant need for this? How would we arrange the in-memory geopandas components? What operations would it need to support to be pragmatic? What is a minimal feature set necessary to attract early-adopters? What performance issues are we likely to come across?

I am familiar with Dask but my experience with GeoPandas is light.

Proposed structure

I suspect that one dask geopandas dataframe (dask-gdf) would be composed of many lazily computed geopandas dataframes (gdfs) with known bounding polygons, stored in another overarching geopandas dataframe (gdf). That is to say that we would partition the full dataset into localized regions, and would centrally maintain a mapping of those regions to pointers to gdfs to help guide computation. Dask-gdf operations would consult this mapping of regions before dispatching appropriate operations to the in-memory gdfs. For example a spatial join with an in-memory gdf would first join against the regions to find which regions have any interaction and then perform many in-memory joins with all of the relevant regions. A spatial join between two dask-gdfs would involve a spatial join on the region-mapping to find all intersections followed by many in-memory spatial joins on the gdfs.

I've convinced myself (perhaps foolishly) that this is sensible for spatial join operations and possible to accomplish lazily (which is important for larger-than-memory use). However I don't know if this metadata is sufficient to implement most other relevant operations for GeoPandas.

For background, here is how we design dask.arrays and dask.dataframes:

Almost all dask.array/dataframe operations are closed under this metadata (they know everything they need to do with this metadata and can generate the metadata of the output) without looking at the underlying values.

Performance issues

Often when parallelizing in-memory libraries we start becoming concerned with things like releasing the GIL and fast serialization. Are these an issue for GeoPandas?

An initial naive test shows that yes, serialization is an issue. We're only able to serialize GeoPandas dataframes at around 2MB/s (although this example may not be representative).

In [1]: import geopandas as gpd

In [2]: cities = gpd.read_file(gpd.datasets.get_path('naturalearth_cities'))

In [3]: import pickle

In [4]: %time len(pickle.dumps(cities))
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 5.59 ms
Out[4]: 11583

In [5]: 11583 / 0.0059 / 1e6  # MB/s
Out[5]: 1.9632203389830507

I haven't tested GIL-releasing friendliness yet.

Explicit questions

  1. Is this sensible?
  2. What operations or classes of operations are relevant to support?
  3. Are there better ways to lay out the the metadata than the geopandas-of-regions approach described above?
  4. Are there better ways to serialize geopandas dataframes?
  5. Do GeoPandas dataframe operations typically release the GIL? If not then is this doable?
  6. How actively maintained is this project? Commits recently seem to be sparse.

cc @kjordahl @jorisvandenbossche @r-shekhar @kuanb @jalessio

mrocklin commented 7 years ago

I notice that this issue has gotten a few +1's, which is great to see. Those who have put these down do you have any comments on the questions above? Even if you're not able to answer questions about serialization, the GIL, etc. you may still be able to provide some context for questions like "what operations are relevant to support?" and "how actively maintained is this project?"

For fun I wrote up an example on parallelizing spatial joins. This is my first time using geopandas. It was quite fun. The built-in plotting is very helpful.

https://gist.github.com/mrocklin/9d4a158afd74d551cd0a4477953af548

mrocklin commented 7 years ago

Alternatively, it could be that people don't have large geospatial datasets and that the right thing for geopandas to focus on is good in-memory performance.

jorisvandenbossche commented 7 years ago

I think a lot of this certainly makes sense.

I personally currently don't have usecase of geospatial datasets that are larger than memory, but even for those in memory usecase, better parallellization is certainly possible (eg on my research group machine with a lot of cores). As the blogpost of @kuanb also nicely showed, many of typical operations (applying a certain calculation function to each row / each geometry) is embarrassingly parallel.

Some first thoughts about part of the issues you raised:

Performance - GIL: as I also mentioned in the dask issue, I think GeoPandas could easily release the GIL on geo-operations, once https://github.com/geopandas/geopandas/issues/430 is tackled (=vectorized geos wrapper). Up to that point, for many of the operations (eg the GeoSeries.distance(..) or other geoseries ops) there is not much gain in releasing the GIL in geos/shapely, as each of those single operations are still wrapped in a python object / called in a python for loop in GeoPandas. But, I am convinced that tackling https://github.com/geopandas/geopandas/issues/430 is actually quite straightforward, see my notebooks in that issue (which implements a basic version for one operation).

Apart from releasing the GIL, the vectorization will also give a big performance gain, which actually might reduce some of the urgent need to parallelize (but for scaling up it will still be needed)

Serializing performance: currently when serializing a geoseries, I think it will use the serialization of the each shapely object inside the series? Again, if more performance is needed, we might need a vectorized cython solution? How does eg the pickling of numpy array work?

Proposed structure: I think structuring the geodataframe parts spatial-wise makes a lot of sense from algorithmic point of vue. This aspect can already be used to some extent using the spatial index (based on rtree). So we certainly should look at that or the interplay with that.

Regarding activity of the project, it's true it is sometimes low, but I don't think there is a lack of interest, but more one of manpower who can do (or review) stuff (at least, that's how it is for me, many ideas to improve geopandas, but currently not much time that I can set aside for it)

kjordahl commented 7 years ago

@mrocklin I agree with @jorisvandenbossche that there is some low-hanging fruit for optimization. The initial implementation of GeoPandas focused on the API, and I believe that is fairly stable now.

  1. proposed structure It does seem worth keeping track of the bounds for regions and this will have some advantages, but we could likely do better than that. Depending on the size of the dataset, it may be possible to have local versions of the spatial index of the entire dask-gdf and use that to optimize operation elementwise rather than just splitting them regionally. This path sounds pretty complex to me, though.

Another route could be to use PostGIS as a compute engine, and abstract the GeoPandas API via dask. This has some real advantages in that the hard work has already been done by PostgreSQL and PostGIS.

  1. serialization I don't have any specific ideas here, but I would be surprised if performance couldn't be improved considerably. Currently GeoPandas uses shapely objects as its native geometry store. Perhaps @sgillies has thoughts on this, some related trials are in https://github.com/Toblerity/Fiona/issues/271 and https://gist.github.com/sgillies/bfeb6b479bfd2e6a0a38.
kuanb commented 7 years ago

@kjordahl I just wanted to make sure I understood your second suggestion - would each Dask worker in that situation load its geospatial data into its own PG db and then run spatial operations through PostGIS methods, to later return the final results back to the user in a Pandas dataframe extracted from the PostGIS operations' results?

kjordahl commented 7 years ago

@kuanb I am thinking of a dask-GeoDataFrame backed by PostGIS with something like dask.dataframe.read_sql_table to have a deferred implementation . That seems doable if the data are already in PostGIS. Some more architecture would be needed to be able to create PostGIS tables (at minimum implementing geopandas/geopandas#189).

jdmcbr commented 7 years ago

I don't currently have larger-than-memory datasets I'm working with, but can envision such datasets in the not too distant future.. In my mind, the big feature to support is spatial joins (geopandas.tools.sjoin). I'd guess that alone would be enough to attract interest.

r-shekhar commented 7 years ago

I can definitely come up with a use case for a Geopandas based Dask, and I would probably use it quite extensively on large spatiotemporal datasets. I think the proposal that @mrocklin outlined is very sensible, and there isn't really anything I would do differently, other than perhaps allowing a Dask dataframe to be chopped up into Geopandas dataframes by a time variable instead, even if it's not optimal for the spatial join.

However, I have an issue with requiring Postgres/PostGIS. That's a really heavy dependency, not installable through conda, that doesn't scale as a Dask cluster scales. For example, I currently launch 20 node Dask clusters on EC2, and if the spatial join was occurring on some Postgres server, that server would bottleneck and the cluster would idle. There are ways to get around this, but it would certainly be a lot easier if everything scaled as Dask does. I would prefer a smaller set of functionality than a heavy dependency like Postgres.

mrocklin commented 7 years ago

@r-shekhar thanks for the feedback. Which operations are particularly relevant for you? Would you be willing to produce a small notebook with fake data that represents what you would like to accomplish?

mrocklin commented 7 years ago

Also, would your use case still be relevant if geopandas itself were 100x faster without Dask?

r-shekhar commented 7 years ago

For the most part, my use case is working with cell phone location data that has both a timestamp and a spatial location. I use the Geopandas spatial joins (within, intersects, contains) for both point-in-polygon and polygon-polygon problems. My data is typically on the order of a few TB. I don't know if I can produce a synthetic notebook that's useful, but here's my description.

So I created a function that I call using Dask's map_partitions that does the following

  1. Casts each partition's Pandas DataFrame to a Geopandas dataframe
  2. Creates a geometry column using WKT or latitude/longitude
  3. Loads the shapefile as a separate Geopandas dataframe
  4. Performs a spatial join using Geopandas
  5. Returns the join results as a pandas Series that can be appended to the original Dask dataframe.

This works, but is a bit clunky, and burns a fair bit of cpu building the spatial indices for each partition independently and doing all the casting. Despite all of this, I find it to be within a factor of 2 of PostGIS, and much more scalable across big clusters.

So my usecase involves one big, TB scale Dask Dataframe (call this big_dask_df), and a Geopandas dataframe created from a shapefile that is small (tens of MB, call this shape_df). If big_dask_df were composed of many smaller Geopandas dataframes, calling sjoin (point-polygon, or polygon-polygon) on each partition of big_dask_df with shape_df would solve my problem well. At the TB scale, I need Dask and cannot work in Geopandas alone, even if it was 100x faster. I also usually keep the data sorted by timestamp, so it would be nice if performing a spatial join does not require two expensive shuffles (to spatial order and back to temporal order), but this isn't crucial.

At this time, I don't need to do spatial merges of two large dask dataframes. I doubt I will ever need this particular feature.

P.S.: I don't mean to discourage PostGIS too much, there are some really neat things available, like ST_3D functions, and the DBSCAN clustering, which I might want to use in the future. But in my case, those features are nice to have, and not required.

Casyfill commented 7 years ago

Hey @r-shekhar. Nice! I had the same approach except that I made a branch of the geopandas which accepted spatial index for sjoin as an argument. this way I shared one spatial index across all the partitions.

I had a large das.dataframe of points and a fairly small geodataframe of polygons Not sure what speed gains I've got there, though.

r-shekhar commented 7 years ago

@Casyfill That's an excellent idea. I also thought about that, but didn't end up implementing because I couldn't spare the time to add the feature.

My motivation wasn't particularly strong because I found building the spatial index on what I refer to as shape_df above to be fairly cheap, under 5 seconds on one core, compared to the ~5 minutes it takes to perform steps 1-5 above on a single dask dataframe partition.

kuanb commented 7 years ago

@r-shekhar and @Casyfill I'd be curious to hear what size datasets you guys are using. I've got datasets on the order of ~1 billion+ rows that I would like to work with. Any chance these situations you are talking about above fall in a similar category? I've had issues with running spatial operations on spatial datasets over ~25 million rows, even when utilizing map_partitions or attempting to perform discrete row-wise operations while aggressively attempting to parallelize the operations with small partition sizes and many workers.

Casyfill commented 7 years ago

My dataset was ~ 40 mln record, but I think this approach should work in your case as well. you definitely might leverage from using hdfs or something alike.

With that, I switched to luigi in the end, passing a chunk of data to each worker. This way, while it wasn't computationally optimal, approach secured the semi-completion - if there are just a few chunks geocoded, and then something went wrong, you can safely run the same script, and it will just ignore those chunks, and work on all others. As my task was to backpropogate the database, and then update it as I get new files, I essentially ran the same script once for the backpropogation, and running it with no changes (I use glob to find files) ever since.

On Mon, Jul 10, 2017 at 8:17 PM Kuan Butts notifications@github.com wrote:

@r-shekhar https://github.com/r-shekhar and @Casyfill https://github.com/casyfill I'd be curious to hear what size datasets you guys are using. I've got datasets on the order of ~1 billion+ rows that I would like to work with. Any chance these situations you are talking about above fall in a similar category? I've had issues with running spatial operations on spatial datasets over ~25 million rows, even when utilizing map_partitions or attempting to perform discrete row-wise operations and attempting to heavily parallelize those.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/geopandas/geopandas/issues/461#issuecomment-314286096, or mute the thread https://github.com/notifications/unsubscribe-auth/ACjTpeItPCLrelWmuwrr7bXFQWxsiVlpks5sMr8NgaJpZM4OLVv1 .

r-shekhar commented 7 years ago

@kuanb So I have a proprietary dataset I can't share, but consists of mobile location data, approximately 10 billion rows per month of real world time. Uncompressed and compressed (Parquet format) sizes are about 2.6TB and 400GB respectively.

As a demo project, I've worked with the NYC taxi data. 1.5 billion rows, and I used Geopandas / Dask to do a spatial join of point-in-polygon (git repository). Dask handles this kind of thing well, so I think a map_partitions approach should handle your 1+ billion rows just fine.

If you want to benchmark performance, the Parquet compressed and processed NYC taxi data (35GB) is available as a torrent here. To avoid clogging up the thread, please email me ravi dot shekhar at gmail if you have questions about this.

jwilson8767 commented 6 years ago

I don't know anything about Dask, but I have already implemented a version of sjoin that runs in parallel using concurrent.futures (which I will share if there is interest) and really think something like it should make its way back into geopandas.

I think that reading and writing shapefiles is also very slow, but not sure where to optimize/if it's even possible.

As far as methods for parallelization, we can either: 1) partition randomly (which I've done previously by just slicing a range of indexes and handing to futures) 2) partition spatially using based on some grid (or maybe using a spatial index?).

I like futures as they support using multiple processes and thereby side-step the GIL, but they are fairly expensive as far as spin-up/spin-down/collection goes. Does Dask do something for this?

edit: Here's the gist of the modified sjoin function using concurrent futures: https://gist.github.com/jwilson8767/62151d486a8c6dbf899854ddaff94468

mrocklin commented 6 years ago

When using the Cython branch (experimental, seemingly stalled for now) the GIL is not an issue.

Dask is happy to use threads or processes.

On Wed, Jan 17, 2018 at 5:11 PM, Joshua Wilson notifications@github.com wrote:

I don't know anything about Dask, but I have already implemented a version of sjoin that runs in parallel using concurrent.futures (which I will share if there is interest) and really think something like it should make its way back into geopandas.

I think that reading and writing shapefiles is also very slow, but not sure where to optimize/if it's even possible.

As far as methods for parallelization, we can either:

  1. partition randomly (which I've done previously by just slicing a range of indexes and handing to futures)
  2. partition spatially using based on some grid (or maybe using a spatial index?).

I like futures as they support using multiple processes and thereby side-step the GIL, but they are fairly expensive as far as spin-up/spin-down/collection goes. Does Dask do something for this?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/geopandas/geopandas/issues/461#issuecomment-358465640, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszGbnNJsdA5daUDrBczq3z7IYp5H5ks5tLnALgaJpZM4OLVv1 .

dlhvelasco commented 5 years ago

I don't know anything about Dask, but I have already implemented a version of sjoin that runs in parallel using concurrent.futures (which I will share if there is interest) and really think something like it should make its way back into geopandas.

I think that reading and writing shapefiles is also very slow, but not sure where to optimize/if it's even possible.

As far as methods for parallelization, we can either:

  1. partition randomly (which I've done previously by just slicing a range of indexes and handing to futures)
  2. partition spatially using based on some grid (or maybe using a spatial index?).

I like futures as they support using multiple processes and thereby side-step the GIL, but they are fairly expensive as far as spin-up/spin-down/collection goes. Does Dask do something for this?

Hey @jwilson8767 , would you mind sharing your code for faster sjoin ? Been using it to count points in a polygon grid and as it is, sjoin isnt as fast as I expected. Thanks in advance!

jwilson8767 commented 5 years ago

@dlhvelasco, I'd love to! Here's a slimmed down version as a gist: https://gist.github.com/jwilson8767/62151d486a8c6dbf899854ddaff94468

You may also want to grab geopandas-cython.

Cheers

dlhvelasco commented 5 years ago

Thanks again, @jwilson8767. I'm fairly new to this and I'm having problems integrating your code 🤔 Usage would be something like this, right?

import sjoin
import geopandas
import pandas
...
sjoined = sjoin.sjoin(points, polygons, how='left', op='within')

Also, I think the cython version has some issues right now.

jwilson8767 commented 5 years ago

@dlhvelasco, I put an example usage at the bottom of the gist. If you have further questions just comment directly on that gist to avoid swamping this thread.

martinfleis commented 3 years ago

I have now transferred this old issue to dask-geopandas repository for reference. A lot is now implemented in some way, not everything is relevant anymore but I just wanted to have everything in the same place.