geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
498 stars 44 forks source link

Issue when reprojecting multiple CSV files #29

Open bilelomrani1 opened 4 years ago

bilelomrani1 commented 4 years ago

I have multiple CSV files opened with dask as is:

import dask.dataframe as dd
import dask_geopandas

df = dd.read_csv('csv/*_timeseries.csv')
gdf = dask_geopandas.from_dask_dataframe(df)
gdf = gdf.set_geometry(
    dask_geopandas.points_from_xy(gdf, x='Longitude', y='Latitude')
).set_crs('epsg:4326').to_crs('epsg:3395')

When invoking gdf.compute(), the following exception is raised:

------------------------------------------------------------------------
ProjError                              Traceback (most recent call last)
<ipython-input-251-f87bb3768545> in <module>
----> 1 gdf.compute()

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2723                     should_rejoin = False
   2724             try:
-> 2725                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2726             finally:
   2727                 for f in futures.values():

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1984             else:
   1985                 local_worker = None
-> 1986             return self.sync(
   1987                 self._gather,
   1988                 futures,

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    830             return future
    831         else:
--> 832             return sync(
    833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/.local/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/optimization.py in __call__()
    959         if not len(args) == len(self.inkeys):
    960             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 961         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    962 
    963     def __reduce__(self):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in apply()
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
   5296     func = kwargs.pop("_func")
   5297     meta = kwargs.pop("_meta")
-> 5298     df = func(*args, **kwargs)
   5299     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   5300         if not len(df):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in __call__()
    891 
    892     def __call__(self, obj, *args, **kwargs):
--> 893         return getattr(obj, self.method)(*args, **kwargs)
    894 
    895     def __reduce__(self):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geodataframe.py in to_crs()
    814         else:
    815             df = self.copy()
--> 816         geom = df.geometry.to_crs(crs=crs, epsg=epsg)
    817         df.geometry = geom
    818         df.crs = geom.crs

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geoseries.py in to_crs()
    541         transformer = Transformer.from_crs(self.crs, crs, always_xy=True)
    542 
--> 543         new_data = vectorized.transform(self.values.data, transformer.transform)
    544         return GeoSeries(
    545             GeometryArray(new_data), crs=crs, index=self.index, name=self.name

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/_vectorized.py in transform()
    877     if compat.USE_PYGEOS:
    878         coords = pygeos.get_coordinates(data)
--> 879         new_coords = func(coords[:, 0], coords[:, 1])
    880         result = pygeos.set_coordinates(data.copy(), np.array(new_coords).T)
    881         return result

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/pyproj/transformer.py in transform()
    428             intime = None
    429         # call pj_transform.  inx,iny,inz buffers modified in place.
--> 430         self._transformer._transform(
    431             inx,
    432             iny,

pyproj/_transformer.pyx in pyproj._transformer._Transformer._transform()

ProjError: x, y, z, and time must be same size

The exception disapears when df if a single csv file df = dd.read_csv('csv/2019_timeseries.csv')

felixlapalma commented 3 years ago

Hi, I was trying to reproduce the issue with some generated data but i could not. Maybe am I missing something?

# make some data
import os
import pandas as pd
import numpy as np

make_data=True
lat_lon_size=100000
csvs_num=10

if make_data:
    #first make some data
    np.random.seed(1234)
    #
    lat=np.random.random(lat_lon_size)*(10) - 37
    lon=np.random.random(lat_lon_size)*(10) - 64
    #
    df_=pd.DataFrame({'id':1,'Latitude':lat,'Longitude':lon})
    # dump dir
    os.makedirs('./csv',exist_ok=True)

    for i,f in enumerate(np.random.random(csvs_num)):
        df_.sample(frac=f).to_csv('./csv/test_{}.csv'.format(str(i)),index=False)

### Issue code 
import dask.dataframe as dd
import dask_geopandas

# from issue  just replace *_timeseries.csv' by test_*.csv
df = dd.read_csv('csv/test_*.csv')
gdf = dask_geopandas.from_dask_dataframe(df)
gdf = gdf.set_geometry(
    dask_geopandas.points_from_xy(gdf, x='Longitude', y='Latitude')
).set_crs('epsg:4326').to_crs('epsg:3395')
#
gdf.compute() 

which ends up with no errors.

bilelomrani1 commented 3 years ago

Sorry for the delay. The snippet you provided indeed works on my machine. Now my code works fine on the latest version of dask-geopandas. Maybe the mistake was on my side in the first place or the latest version fixed the issue. Anyway thank you very much!