geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
503 stars 45 forks source link

Create Dask GeoDataFrame from wkt string #208

Closed FlorisCalkoen closed 4 months ago

FlorisCalkoen commented 2 years ago

I'm trying to load geometries from wkt.strings stored in a dask.Dataframe . I tried both in native dask and dask_geopandas, but the libraries seem not to be able to deal with wkt strings. Since the dask_geopandas community is probably more familiar with this type of issue, I thought I could better raise it here.

Please find below a toy script with some things I've tried.

import dask.dataframe as dd
import dask_geopandas
import numpy as np
import shapely

def point():
    return shapely.geometry.Point(np.random.randint(low=0, high=90, size=2))

df = pd.DataFrame([wkt.dumps(point()) for i in range(100)], columns=["geometry"])
ddf = dd.from_pandas(df, npartitions=4)
# ddf.compute().geometry.apply(wkt.loads)  # runs fine
# ddf.geometry.apply(wkt.loads)  # raises ParseException: Unknown type: 'FOO'

dask_geopandas.from_dask_dataframe(ddf, geometry="geometry")  # TypeError("Input must be valid geometry objects: {0}".format(geom))

def partition_wrapper(df):
    df.geometry = df.geometry.apply(wkt.loads)
    return df

dask_geopandas.from_dask_dataframe(ddf).map_partitions(partition_wrapper)  # TypeError("Only str is accepted.")

What would be the suggested work around? Or should a functionality to load from wkt strings become available in dask_geopandas?

jorisvandenbossche commented 2 years ago

For the case of x / y columns, we have an example in the docs like:

import dask.dataframe as dd
import dask_geopandas

ddf = dd.read_csv('...')

ddf = ddf.set_geometry(
    dask_geopandas.points_from_xy(ddf, 'latitude', 'longitude')
)

We should probably add dask_geopandas.from_wkt/from_wkb functions as well, so you can do something similar when starting from a WKT / WKB column.

For now, I think the best work-around is the map_partitions that you are already using, but simplified a bit by using geopandas' from_wkt (which will also be faster) on that column:

In [25]: ddf["geometry"].map_partitions(geopandas.GeoSeries.from_wkt, meta=geopandas.GeoSeries([]))
Out[25]: 
Dask GeoSeries Structure:
npartitions=4
0     geometry
25         ...
50         ...
75         ...
99         ...
dtype: geometry
Dask Name: from_wkt, 12 tasks

We need to pass meta in this case, because otherwise dask's inference of the output dtype fails.

That's probably also the reason why ddf.geometry.apply(wkt.loads) is failing.

FlorisCalkoen commented 2 years ago

Thanks! I think having a function dask_geopandas.from_wkt/from_wkb would be great.

Coming back to the workaround. I'm now calculating it like this:

gddf = dask_geopandas.from_dask_dataframe(
    ddf,
    geometry=ddf["geometry"].map_partitions(
        gpd.GeoSeries.from_wkt, meta=gpd.GeoSeries([])
    ),
)
gddf = gddf.set_crs(4326)
gdf = gddf.compute()  
gdf.sample(10).explore()  # correct

UPDATE: This works as expected, but only when indices are exported when writing the partitions to disk.

import tempfile
import dask.dataframe as dd
import dask_geopandas
import numpy as np
import shapely

def point():
    return shapely.geometry.Point(np.random.randint(low=0, high=90, size=2))

with tempfile.TemporaryDirectory() as tmpdir:

    df = pd.DataFrame([wkt.dumps(point()) for i in range(100)], columns=["geometry"])
    ddf = dd.from_pandas(df, npartitions=4)

    name_function = lambda x: f"points{x}.parquet"
    ddf.to_parquet(
        tmpdir,
        name_function=name_function,
        write_index=False, 
    )
    print("Done!")

    ddf = dd.read_parquet(tmpdir)

    gddf = dask_geopandas.from_dask_dataframe(
        ddf,
        geometry=ddf["geometry"].map_partitions(
            gpd.GeoSeries.from_wkt, meta=gpd.GeoSeries([])
        ),
    )
    gddf = gddf.set_crs(4326)
    gdf = gddf.compute()  #  when write_index = False: ValueError: cannot reindex on an axis with duplicate labels
giorgiobasile commented 1 year ago

We should probably add dask_geopandas.from_wkt/from_wkb functions as well, so you can do something similar when starting from a WKT / WKB column.

Hi @jorisvandenbossche, I'm pretty new to dask-geopandas. A few weeks back, I had a similar requirement and I basically ended up creating my own from_wkt and from_wkb functions, which I now shared here.

I used the same approach implemented in dask_geopandas.points_from_xy to work with WKT/WKB geometries. I'm not sure whether you really find it useful to introduce these utilities; in that case, I'd be happy to send a PR :)

bkanuka commented 7 months ago

None of the examples in this ticket worked for me, and it took me way too long to figure out that it was because of dask-expr and its "query planning". To get these examples to work, you need to disable query planning:

import dask

dask.config.set({"dataframe.query-planning": False})

See: https://github.com/geopandas/dask-geopandas/issues/284