geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
486 stars 45 forks source link

BUG: `to_parquet()` failing with `dask=2024.4.1` #287

Closed joaomacalos closed 4 months ago

joaomacalos commented 4 months ago

Hello, The method to_parquet() method is not working with dask version 2024.4.1. This is the error: ArrowTypeError: Did not pass numpy.dtype object

With dask version 2024.2.1 it works fine.

Here is a minimal reproducible example of the error

import dask_geopandas
import geopandas as gpd
gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([1,2,3], [3,2,1]))
dask_geopandas.from_geopandas(gdf, npartitions=1).to_parquet("error.parquet")

The package versions I'm using when I find the error:

dask = 2024.4.1
dask-geopandas=0.3.1
pyarrow=15.0.2

Below are the relevant sections of the traceback:

---------------------------------------------------------------------------
ArrowTypeError                            Traceback (most recent call last)
File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask/dataframe/io/parquet/arrow.py:842, in ArrowDatasetEngine._pandas_to_arrow_table(cls, df, preserve_index, schema)
    841 try:
--> 842     return pa.Table.from_pandas(
    843         df, nthreads=1, preserve_index=preserve_index, schema=schema
    844     )
    845 except pa.ArrowException as exc:

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/table.pxi:3874, in pyarrow.lib.Table.from_pandas()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/pandas_compat.py:611, in dataframe_to_arrays(df, schema, preserve_index, nthreads, columns, safe)
    610 if nthreads == 1:
--> 611     arrays = [convert_column(c, f)
    612               for c, f in zip(columns_to_convert, convert_fields)]
    613 else:

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/pandas_compat.py:598, in dataframe_to_arrays.<locals>.convert_column(col, field)
    596     e.args += ("Conversion failed for column {!s} with type {!s}"
    597                .format(col.name, col.dtype),)
--> 598     raise e
    599 if not field_nullable and result.null_count > 0:

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/pandas_compat.py:592, in dataframe_to_arrays.<locals>.convert_column(col, field)
    591 try:
--> 592     result = pa.array(col, type=type_, from_pandas=True, safe=safe)
    593 except (pa.ArrowInvalid,
    594         pa.ArrowNotImplementedError,
    595         pa.ArrowTypeError) as e:

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/array.pxi:339, in pyarrow.lib.array()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/array.pxi:85, in pyarrow.lib._ndarray_to_array()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()

ArrowTypeError: ('Input object was not a NumPy array', 'Conversion failed for column geometry with type geometry')

During handling of the above exception, another exception occurred:

ArrowTypeError                            Traceback (most recent call last)
Cell In[8], line 1
----> 1 dask_geopandas.from_geopandas(gdf, npartitions=1).to_parquet("error.parquet")

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask_expr/_collection.py:3150, in DataFrame.to_parquet(self, path, **kwargs)
   3147 def to_parquet(self, path, **kwargs):
   3148     from dask_expr.io.parquet import to_parquet
-> 3150     return to_parquet(self, path, **kwargs)

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask_expr/io/parquet.py:521, in to_parquet(df, path, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, name_function, filesystem, engine, **kwargs)
    501     out = new_collection(
    502         ToParquet(
    503             df,
   (...)
    517         )
    518     )
    520 if compute:
--> 521     out = out.compute(**compute_kwargs)
    523 # Invalidate the filesystem listing cache for the output path after write.
    524 # We do this before returning, even if `compute=False`. This helps ensure
    525 # that reading files that were just written succeeds.
    526 fs.invalidate_cache(path)

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask_expr/_collection.py:453, in FrameBase.compute(self, fuse, **kwargs)
    451     out = out.repartition(npartitions=1)
    452 out = out.optimize(fuse=fuse)
--> 453 return DaskMethodsMixin.compute(out, **kwargs)

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask_expr/_expr.py:3570, in Fused._execute_task(graph, name, *deps)
   3568 for i, dep in enumerate(deps):
   3569     graph["_" + str(i)] = dep
-> 3570 return dask.core.get(graph, name)

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask/dataframe/io/parquet/core.py:171, in ToParquetFunctionWrapper.__call__(self, df, block_index)
    164 filename = (
    165     f"part.{part_i + self.i_offset}.parquet"
    166     if self.name_function is None
    167     else self.name_function(part_i + self.i_offset)
    168 )
    170 # Write out data
--> 171 return self.engine.write_partition(
    172     df,
    173     self.path,
    174     self.fs,
    175     filename,
    176     self.partition_on,
    177     self.write_metadata_file,
    178     **(dict(self.kwargs_pass, head=True) if part_i == 0 else self.kwargs_pass),
    179 )

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask/dataframe/io/parquet/arrow.py:894, in ArrowDatasetEngine.write_partition(cls, df, path, fs, filename, partition_on, return_metadata, fmd, compression, index_cols, schema, head, custom_metadata, **kwargs)
    891 else:
    892     index_cols = []
--> 894 t = cls._pandas_to_arrow_table(df, preserve_index=preserve_index, schema=schema)
    895 if custom_metadata:
    896     _md = t.schema.metadata

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/dask/dataframe/io/parquet/arrow.py:848, in ArrowDatasetEngine._pandas_to_arrow_table(cls, df, preserve_index, schema)
    846 if schema is None:
    847     raise
--> 848 df_schema = pa.Schema.from_pandas(df)
    849 expected = textwrap.indent(
    850     schema.to_string(show_schema_metadata=False), "    "
    851 )
    852 actual = textwrap.indent(
    853     df_schema.to_string(show_schema_metadata=False), "    "
    854 )

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/types.pxi:2771, in pyarrow.lib.Schema.from_pandas()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/pandas_compat.py:546, in dataframe_to_types(df, preserve_index, columns)
    543 elif _pandas_api.is_extension_array_dtype(values):
    544     empty = c.head(0) if isinstance(
    545         c, _pandas_api.pd.Series) else c[:0]
--> 546     type_ = pa.array(empty, from_pandas=True).type
    547 else:
    548     values, type_ = get_datetimetz_type(values, c.dtype, None)

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/array.pxi:339, in pyarrow.lib.array()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/array.pxi:81, in pyarrow.lib._ndarray_to_array()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/array.pxi:69, in pyarrow.lib._ndarray_to_type()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/error.pxi:154, in pyarrow.lib.pyarrow_internal_check_status()

File ~/micromamba/envs/test_dask/lib/python3.12/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()

ArrowTypeError: Did not pass numpy.dtype object

Many thanks 🙏

cisaacstern commented 4 months ago

I also experienced this, downgrading to 2024.2.1 as you suggested also resolved the issue for me. I did not check every version in between 2024.2.1 and 2024.4.1, did you by chance?

Curious if 2024.2.1 is the latest version that works or if any of these do as well?

Screen Shot 2024-04-27 at 2 17 35 PM
jorisvandenbossche commented 4 months ago

Yeah, for now you will need to use a slightly older version of dask, or disable query planning manually, because the latest dask-geopandas does not yet supports dask's new query planning features (see https://github.com/geopandas/dask-geopandas/issues/284, and PR https://github.com/geopandas/dask-geopandas/pull/285 that fixes this bug).

The query planning was enabled by default in dask 2024.3.0 (https://docs.dask.org/en/stable/changelog.html#v2024-3-0)