geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
486 stars 45 forks source link

Spatial_shuffle() can result in ArrowTypeError when using pyarrow 12 #256

Open FlorisCalkoen opened 1 year ago

FlorisCalkoen commented 1 year ago

When using a Dask distributed client and pyarrow=12.*, spatial shuffle can trigger a call to pa.Schema.from_pandas(df) which results in ArrowTypeError: Did not pass numpy.dtype object.

Reproducible example:

import dask
import dask.dataframe as dd
import dask_geopandas
import geodatasets
import geopandas as gpd
import pandas as pd
import pyarrow as pa
from distributed import Client
from geopandas.array import GeometryDtype

# sample data
rivers = gpd.read_file(geodatasets.get_path("eea large_rivers")).to_crs(4326)
grid = gpd.read_file(geodatasets.get_path("geoda grid100")).to_crs(4326)

# error doesn't happen when you don't use a distributed client.
client = Client()
print(client.dashboard_link)

# make data available to all workers
[scattered_grid] = client.scatter([grid[["geometry", "POLYID"]]], broadcast=True)

# do some work
def overlay_by_grid(df, grid):
    df = gpd.overlay(
        df,
        grid,
        keep_geom_type=False,
    ).explode(column="geometry", index_parts=False)
    return df

META = gpd.GeoDataFrame(
    {
        "NAME": pd.Series([], dtype=str),
        "Shape_Leng": pd.Series([], dtype="f8"),
        "POLYID": pd.Series([], dtype="i4"),
        "geometry": gpd.GeoSeries([], dtype=GeometryDtype),
    }
)

lazy_values = []
ddf = dask_geopandas.from_geopandas(rivers, npartitions=10)
for partition in ddf.to_delayed():
    partition = dask.delayed(overlay_by_grid)(partition, scattered_grid)
    lazy_values.append(partition)
ddf = dd.from_delayed(lazy_values, meta=META)
ddf.spatial_shuffle()  #  <-- this produces the error when using pyarrow=12.* and the `dask.distributed.Client`
Click to see traceback ```python-traceback /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection return lib.intersection(a, b, **kwargs) 2023-06-15 13:34:24,661 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 0) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Danube 2.770357e+06 ... 2 539448662 1 Danube 2.770357e+06 ... 1 452134766 2 Danube 2.770357e+06 ... 8 3782464245 2 Danube 2.770357e+06 ... 9 4146295380 2 Danube 2.770357e+06 ... 9 4145483446 2 Danube 2.770357e+06 ... 9 4145105290 2 Danube 2.770357e+06 ... 9 4145180995 [7 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 0, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,708 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 5) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Rhone 910617.937531 ... 1 224597375 0 Rhone 910617.937531 ... 1 224461416 0 Rhone 910617.937531 ... 0 142653391 0 Rhone 910617.937531 ... 0 171455062 0 Rhone 910617.937531 ... 1 171476924 0 Rhone 910617.937531 ... 1 181956632 1 Sava 625964.046091 ... 9 3864776142 2 Sava 625964.046091 ... 9 3889637540 [8 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 5, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,709 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 2) Function: shuffle_transfer args: (Empty GeoDataFrame Columns: [NAME, Shape_Leng, POLYID, geometry, _partitions, _index] Index: [], '6ccb0d6d714273c1b356ecef0cf2ba3d', 2, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,709 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 3) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Loire 944755.381131 ... 0 104588529 1 Nemunas 689209.621531 ... 4 2235773386 [2 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 3, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,710 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 9) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Vuoksa 153155.546336 ... 5 2706767300 1 Kemijoki 465164.358679 ... 5 2768818853 1 Kemijoki 465164.358679 ... 5 2604172295 1 Kemijoki 465164.358679 ... 5 2600927167 1 Kemijoki 465164.358679 ... 5 2599495452 1 Kemijoki 465164.358679 ... 6 2789728699 1 Kemijoki 465164.358679 ... 6 2782712749 1 Kemijoki 465164.358679 ... 6 2795155147 1 Kemijoki 465164.358679 ... 6 2795062494 2 Vuoksa 153155.546336 ... 6 3016224678 [10 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 9, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,711 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 8) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Zap. Dvina 806083.181154 ... 6 3119987828 1 Zap. Dvina 806083.181154 ... 6 3210229576 2 Po 624045.842969 ... 1 515795029 3 Po 624045.842969 ... 1 500383478 [4 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 8, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,714 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 6) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Seine 673648.436798 ... 4 969640100 [1 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 6, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,715 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 7) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Tisa 624533.651205 ... 8 3689965283 0 Tisa 624533.651205 ... 9 3832754910 0 Tisa 624533.651205 ... 8 3689967583 0 Tisa 624533.651205 ... 8 3705278923 1 Vistula 895680.167951 ... 7 3569398882 1 Vistula 895680.167951 ... 7 3637816901 1 Vistula 895680.167951 ... 7 3637217724 2 Vistula 895680.167951 ... 7 3620047341 3 Vistula 895680.167951 ... 7 3637743105 [9 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 7, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" 2023-06-15 13:34:24,718 - distributed.worker - WARNING - Compute Failed Key: ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 1) Function: shuffle_transfer args: ( NAME Shape_Leng ... _partitions _index 0 Ebro 8.269909e+05 ... 0 3871157 0 Ebro 8.269909e+05 ... 0 645733 1 Elbe 1.087288e+06 ... 3 801920486 2 Elbe 1.087288e+06 ... 3 798473106 2 Elbe 1.087288e+06 ... 3 745151648 2 Elbe 1.087288e+06 ... 3 751031899 2 Elbe 1.087288e+06 ... 2 673496756 2 Elbe 1.087288e+06 ... 2 653474055 [8 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 1, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) kwargs: {} Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')" --------------------------------------------------------------------------- ArrowTypeError Traceback (most recent call last) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:63, in shuffle_transfer() 62 try: ---> 63 return _get_worker_extension().add_partition( 64 input, 65 shuffle_id=id, 66 type=ShuffleType.DATAFRAME, 67 input_partition=input_partition, 68 npartitions=npartitions, 69 column=column, 70 parts_out=parts_out, 71 ) 72 except Exception as e: File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:654, in add_partition() 653 kwargs["empty"] = data --> 654 shuffle = self.get_or_create_shuffle(shuffle_id, type=type, **kwargs) 655 return sync( 656 self.worker.loop, 657 shuffle.add_partition, 658 data=data, 659 input_partition=input_partition, 660 ) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:910, in get_or_create_shuffle() 904 def get_or_create_shuffle( 905 self, 906 shuffle_id: ShuffleId, 907 type: ShuffleType, 908 **kwargs: Any, 909 ) -> ShuffleRun: --> 910 return sync( 911 self.worker.loop, 912 self._get_or_create_shuffle, 913 shuffle_id, 914 type, 915 **kwargs, 916 ) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:418, in sync() 417 typ, exc, tb = error --> 418 raise exc.with_traceback(tb) 419 else: File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:391, in f() 390 future = asyncio.ensure_future(future) --> 391 result = yield future 392 except Exception: File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/tornado/gen.py:767, in run() 766 try: --> 767 value = future.result() 768 except Exception as e: 769 # Save the exception for later. It's important that 770 # gen.throw() not be called inside this try/except block 771 # because that makes sys.exc_info behave unexpectedly. File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:735, in _get_or_create_shuffle() 734 if shuffle is None: --> 735 shuffle = await self._refresh_shuffle( 736 shuffle_id=shuffle_id, 737 type=type, 738 kwargs=kwargs, 739 ) 741 if self.closed: File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:784, in _refresh_shuffle() 779 assert kwargs is not None 780 result = await self.worker.scheduler.shuffle_get_or_create( 781 id=shuffle_id, 782 type=type, 783 spec={ --> 784 "schema": pa.Schema.from_pandas(kwargs["empty"]) 785 .serialize() 786 .to_pybytes(), 787 "npartitions": kwargs["npartitions"], 788 "column": kwargs["column"], 789 "parts_out": kwargs["parts_out"], 790 }, 791 worker=self.worker.address, 792 ) 793 elif type == ShuffleType.ARRAY_RECHUNK: File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/types.pxi:2574, in pyarrow.lib.Schema.from_pandas() 2573 from pyarrow.pandas_compat import dataframe_to_types -> 2574 names, types, metadata = dataframe_to_types( 2575 df, File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:546, in dataframe_to_types() 544 empty = c.head(0) if isinstance( 545 c, _pandas_api.pd.Series) else c[:0] --> 546 type_ = pa.array(empty, from_pandas=True).type 547 else: File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/array.pxi:323, in pyarrow.lib.array() 322 values, obj.dtype, type) --> 323 result = _ndarray_to_array(values, mask, type, c_from_pandas, safe, 324 pool) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/array.pxi:79, in pyarrow.lib._ndarray_to_array() 78 shared_ptr[CChunkedArray] chunked_out ---> 79 shared_ptr[CDataType] c_type = _ndarray_to_type(values, type) 80 CCastOptions cast_options = CCastOptions(safe) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/array.pxi:67, in pyarrow.lib._ndarray_to_type() 66 with nogil: ---> 67 check_status(NumPyDtypeToArrow(dtype, &c_type)) 68 File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/error.pxi:123, in pyarrow.lib.check_status() 122 elif status.IsTypeError(): --> 123 raise ArrowTypeError(message) 124 elif status.IsCapacityError(): ArrowTypeError: Did not pass numpy.dtype object The above exception was the direct cause of the following exception: RuntimeError Traceback (most recent call last) Cell In[6], line 1 ----> 1 ddf.spatial_shuffle() File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask_geopandas/core.py:807, in GeoDataFrame.spatial_shuffle(self, by, level, calculate_partitions, npartitions, divisions, **kwargs) 797 sorted_ddf = self.set_index( 798 by, 799 sorted=False, (...) 803 **kwargs, 804 ) 806 if calculate_partitions: --> 807 sorted_ddf.calculate_spatial_partitions() 809 return sorted_ddf File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask_geopandas/core.py:186, in _Frame.calculate_spatial_partitions(self) 178 else: 179 import pygeos # noqa 181 parts = geopandas.GeoSeries( 182 self.map_partitions( 183 lambda part: pygeos.convex_hull( 184 pygeos.geometrycollections(part.geometry.values.data) 185 ) --> 186 ).compute(), 187 crs=self.crs, 188 ) 189 self.spatial_partitions = parts File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask/base.py:310, in DaskMethodsMixin.compute(self, **kwargs) 286 def compute(self, **kwargs): 287 """Compute this dask collection 288 289 This turns a lazy Dask collection into its in-memory equivalent. (...) 308 dask.compute 309 """ --> 310 (result,) = compute(self, traverse=False, **kwargs) 311 return result File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 592 keys.append(x.__dask_keys__()) 593 postcomputes.append(x.__dask_postcompute__()) --> 595 results = schedule(dsk, keys, **kwargs) 596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/client.py:3227, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 3225 should_rejoin = False 3226 try: -> 3227 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 3228 finally: 3229 for f in futures.values(): File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/client.py:2361, in Client.gather(self, futures, errors, direct, asynchronous) 2359 else: 2360 local_worker = None -> 2361 return self.sync( 2362 self._gather, 2363 futures, 2364 errors=errors, 2365 direct=direct, 2366 local_worker=local_worker, 2367 asynchronous=asynchronous, 2368 ) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 349 return future 350 else: --> 351 return sync( 352 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 353 ) File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs) 416 if error: 417 typ, exc, tb = error --> 418 raise exc.with_traceback(tb) 419 else: 420 return result File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:391, in sync..f() 389 future = wait_for(future, callback_timeout) 390 future = asyncio.ensure_future(future) --> 391 result = yield future 392 except Exception: 393 error = sys.exc_info() File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self) 765 try: 766 try: --> 767 value = future.result() 768 except Exception as e: 769 # Save the exception for later. It's important that 770 # gen.throw() not be called inside this try/except block 771 # because that makes sys.exc_info behave unexpectedly. 772 exc: Optional[Exception] = e File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/client.py:2224, in Client._gather(self, futures, errors, direct, local_worker) 2222 exc = CancelledError(key) 2223 else: -> 2224 raise exception.with_traceback(traceback) 2225 raise exc 2226 if errors == "skip": File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:73, in shuffle_transfer() 63 return _get_worker_extension().add_partition( 64 input, 65 shuffle_id=id, (...) 70 parts_out=parts_out, 71 ) 72 except Exception as e: ---> 73 raise RuntimeError(f"shuffle_transfer failed during shuffle {id}") from e RuntimeError: shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d ```

The error can also be reproduced like this:

pa.Schema.from_pandas(ddf.partitions[0].compute())

The same code works when using pyarrow=11, although it's slower.

Environment:

jorisvandenbossche commented 1 year ago

@FlorisCalkoen thanks for the report!

I can reproduce the error, but I am not sure it is related to the pyarrow version (I get the error with pyarrow 10 or 11 as well), but maybe rather related to the dask/distributed version?

My current understanding is that this error comes from the new shuffle implementation in distributed (https://blog.coiled.io/blog/shuffling-large-data-at-constant-memory.html, starting with dask 2023.2.1), which now uses Arrow IPC to serialize the data and send them between workers. But converting a geopandas.GeoDataFrame to pyarrow.Table doesn't work out of the box, because arrow doesn't know what to do with the geometry column.

And I can confirm this by specifying to use the older task-based shuffling:

ddf.spatial_shuffle(shuffle="tasks")

That works without error for me.

jorisvandenbossche commented 1 year ago

We should of course ensure this works with the new P2P shuffle as well, as that brings many benefits. I have to look a bit closer into it, but essentially we have to make the following work:

In [7]: rivers = gpd.read_file(geodatasets.get_path("eea large_rivers")).to_crs(4326)

In [8]: import pyarrow as pa

In [9]: pa.Table.from_pandas(rivers)
...
ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column geometry with type geometry')

This is something that could be fixed on the GeoPandas side by defining an arrow extension type (to control how the geometry column gets converted to arrow and back). However, I am not fully sure how dask/distributed could know we want back a GeoDataFrame and not a DataFrame (something to try out).

Or dask/distributed needs to give us some way to register a method to override this default conversion, similarly as we did for just dask's to_parquet to register a pyarrow_schema_dispatch (https://github.com/geopandas/dask-geopandas/commit/82da8f18b19524fcd04a567f0a7d83cfdac143c8)

martinfleis commented 1 year ago

cc @hendrikmakait, you were interested how P2P works with dask-geopandas. It doesn't at the moment :).

jorisvandenbossche commented 1 year ago

With the following patch to geopandas, the above example works:

--- a/geopandas/array.py
+++ b/geopandas/array.py
@@ -1257,7 +1257,10 @@ class GeometryArray(ExtensionArray):
         # GH 1413
         if isinstance(scalars, BaseGeometry):
             scalars = [scalars]
-        return from_shapely(scalars)
+        try:
+            return from_shapely(scalars)
+        except TypeError:
+            return from_wkb(scalars)

     def _values_for_factorize(self):
         # type: () -> Tuple[np.ndarray, Any]
@@ -1454,6 +1457,11 @@ class GeometryArray(ExtensionArray):
         """
         return to_shapely(self)

+    def __arrow_array__(self, type=None):
+        # convert the underlying array values to a pyarrow Array
+        import pyarrow
+        return pyarrow.array(to_wkb(self), type=type)
+
     def _binop(self, other, op):
         def convert_values(param):
             if not _is_scalar_geometry(param) and (

Explanation:

jorisvandenbossche commented 1 year ago

While the points I raise are things we should address in geopandas anyway at some point (although there are some questions about which default representation to use when converting to arrow), there are also other solutions in dask and distributed itself: dask added a dispatch method for pyarrow<->pandas conversion (https://github.com/dask/dask/pull/10312) which we can implement, and I think that should also fix this issue when that dispatch method is used in distributed (WIP PR for this is at https://github.com/dask/distributed/pull/7743)