geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
486 stars 45 forks source link

ENH: minimal support for dask.dataframe query planning (dask-expr) #285

Closed jorisvandenbossche closed 4 months ago

jorisvandenbossche commented 5 months ago

Tackling https://github.com/geopandas/dask-geopandas/issues/284, for now in a minimal way, i.e. keeping as much as possible the current code based on "legacy" dask and then turn that into an expression.

For example, for sjoin/clip/file IO, we manually build up the graph to create a collection using HighLevelGraph, for now this graph is just turned into an expression with from_graph. Long term, we can turn each of those into proper expression classes.

I am not yet fully sure how to best organize the code. For now, the new file expr.py is essentially duplicating a lot of core.py. I organized the commits such that the first just does this copy, and then you can look at the second commit to see the actual changes that were needed to core.py (in expr.py) to make it work with dask-expr -> https://github.com/geopandas/dask-geopandas/commit/d28521f30ac0f37b2c0e9c6315eb3921a4896a76

The problem is that I can't do this directly, because we want to keep supporting legacy dask for a while as well (our code is actually also still using this for intermediate results)

Some other remarks:

Closes https://github.com/geopandas/dask-geopandas/issues/287

martinfleis commented 5 months ago

@jorisvandenbossche can you ping me once you want a review of this? I'll do some reading on dask-expr in the meantime.

jorisvandenbossche commented 5 months ago

I think this should be ready enough for a review

TomAugspurger commented 5 months ago

I spent a bit of time looking at the split-out failure, but didn't make much progress. I'll take another look later.

ReptarK commented 4 months ago

Hi ,thanks for the efforts on supporting dask-expr. Any updates on the integration with it ?

TomAugspurger commented 4 months ago

Some discussion at https://github.com/dask/dask-expr/issues/1024.

TomAugspurger commented 4 months ago

@jorisvandenbossche I can't push to your branch, but this diff fixes the test test_from_dask_dataframe_with_dask_geoseries. That turned up an error in from_dask_dataframe with geometry=dask_series, which is also fixed in that commit (and I added a test for it directly).

```diff diff --git a/dask_geopandas/expr.py b/dask_geopandas/expr.py index 99d71d2..136953d 100644 --- a/dask_geopandas/expr.py +++ b/dask_geopandas/expr.py @@ -200,6 +200,7 @@ class _Frame(dx.FrameBase, OperatorMethodMixin): @classmethod def _bind_elemwise_operator_method(cls, name, op, original, *args, **kwargs): """bind operator method like GeoSeries.distance to this class""" + # name must be explicitly passed for div method whose name is truediv def meth(self, other, *args, **kwargs): meta = _emulate(op, self, other) @@ -505,7 +506,6 @@ class _Frame(dx.FrameBase, OperatorMethodMixin): return distances def geohash(self, as_string=True, precision=12): - """ Calculate geohash based on the middle points of the geometry bounds for a given precision. @@ -842,7 +842,7 @@ def from_dask_dataframe(df, geometry=None): # it via a keyword-argument due to https://github.com/dask/dask/issues/8308. # Instead, we assign the geometry column using regular dataframe operations, # then refer to that column by name in `map_partitions`. - if isinstance(geometry, dd.core.Series): + if isinstance(geometry, dx.Series): name = geometry.name if geometry.name is not None else "geometry" return df.assign(**{name: geometry}).map_partitions( geopandas.GeoDataFrame, geometry=name diff --git a/dask_geopandas/tests/test_core.py b/dask_geopandas/tests/test_core.py index b28a0c7..fbde582 100644 --- a/dask_geopandas/tests/test_core.py +++ b/dask_geopandas/tests/test_core.py @@ -390,27 +390,44 @@ def test_rename_geometry_error(geodf_points): dask_obj.rename_geometry("value1") -# TODO to_dask_dataframe is now defined on the dask-expr collection, converting -# to an old-style dd.core.DataFrame (so doing something different as we did here) -@pytest.mark.xfail( - dask_geopandas.backends.QUERY_PLANNING_ON, reason="Need to update test for expr" -) def test_from_dask_dataframe_with_dask_geoseries(): df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]}) dask_obj = dd.from_pandas(df, npartitions=2) dask_obj = dask_geopandas.from_dask_dataframe( dask_obj, geometry=dask_geopandas.points_from_xy(dask_obj, "x", "y") ) - # Check that the geometry isn't concatenated and embedded a second time in - # the high-level graph. cf. https://github.com/geopandas/dask-geopandas/issues/197 - k = next(k for k in dask_obj.dask.dependencies if k.startswith("GeoDataFrame")) - deps = dask_obj.dask.dependencies[k] - assert len(deps) == 1 + + if dask_geopandas.backends.QUERY_PLANNING_ON: + deps = dask_obj.expr.dependencies() + assert len(deps) == 1 + dep = deps[0] + other = list(dask_obj.dask.values())[0][3]["geometry"].dependencies()[0] + assert dep is other + + else: + # Check that the geometry isn't concatenated and embedded a second time in + # the high-level graph. cf. https://github.com/geopandas/dask-geopandas/issues/197 + k = next(k for k in dask_obj.dask.dependencies if k.startswith("GeoDataFrame")) + deps = dask_obj.dask.dependencies[k] + assert len(deps) == 1 expected = df.set_geometry(geopandas.points_from_xy(df["x"], df["y"])) + dask_obj.geometry.compute() assert_geoseries_equal(dask_obj.geometry.compute(), expected.geometry) +def test_set_geometry_to_dask_series(): + df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]}) + + dask_obj = dd.from_pandas(df, npartitions=2) + dask_obj = dask_geopandas.from_dask_dataframe( + dask_obj, geometry=dask_geopandas.points_from_xy(dask_obj, "x", "y") + ) + expected = geopandas.GeoDataFrame(df, geometry=geopandas.points_from_xy(df.x, df.y)) + result = dask_obj.geometry.compute() + assert_geoseries_equal(result, expected.geometry) + + def test_from_dask_dataframe_with_column_name(): df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]}) df["geoms"] = geopandas.points_from_xy(df["x"], df["y"]) ```
TomAugspurger commented 4 months ago

PR to your branch at https://github.com/jorisvandenbossche/dask-geopandas/pull/1 that includes that diff and a fix for a couple more issues.

The final xfail with dask-expr is from split_out. I haven't had a chance to follow up on https://github.com/dask/dask-expr/issues/1024, but it seems like it might be related to shuffling in dask-expr.

jorisvandenbossche commented 4 months ago

@TomAugspurger thanks for the fixes! (and good you mention it here, because apparently you don't watch your own fork by default, so I didn't get any notification from your PR on my repo ..)

jorisvandenbossche commented 4 months ago

@martinfleis unless you have time on the short term, I would suggest we already merge this to have a working main branch with latest dask, and further fixes/improvements can be done in follow-ups

martinfleis commented 4 months ago

@jorisvandenbossche Go ahead