holoviz / spatialpandas

Pandas extension arrays for spatial/geometric operations
BSD 2-Clause "Simplified" License
308 stars 24 forks source link

spatialpandas failure on Dask Remote Cluster #66

Closed graphific closed 3 years ago

graphific commented 3 years ago

Overview notebook > spatialpandas using dask works on LocalCluster, but dask errors out when using a remote cluster:

Me:

cluster = gateway.new_cluster(profile=size)
print("Scaling workers")
cluster.scale(workers)
client = cluster.get_client()

notebook code:

...

## Large spatialpandas DaskGeoDataFrame with 16 partitions
cities_large_ddf = dd.from_pandas(cities_large_df, npartitions=16).persist()

# Precompute the partition-level spatial index
cities_large_ddf.partition_sindex

len(sjoin(cities_large_ddf, world_df))

Error trace:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-91-44679f1f7790> in <module>
      3 
      4 # Precompute the partition-level spatial index
----> 5 cities_large_ddf.partition_sindex

~/.local/lib/python3.8/site-packages/spatialpandas/dask.py in partition_sindex(self)
    145                 geometry._partition_bounds = self._partition_bounds[geometry_name]
    146 
--> 147             self._partition_sindex[geometry.name] = geometry.partition_sindex
    148             self._partition_bounds[geometry_name] = geometry.partition_bounds
    149         return self._partition_sindex[geometry_name]

~/.local/lib/python3.8/site-packages/spatialpandas/dask.py in partition_sindex(self)
     66     def partition_sindex(self):
     67         if self._partition_sindex is None:
---> 68             self._partition_sindex = HilbertRtree(self.partition_bounds.values)
     69         return self._partition_sindex
     70 

~/.local/lib/python3.8/site-packages/spatialpandas/dask.py in partition_bounds(self)
     47     def partition_bounds(self):
     48         if self._partition_bounds is None:
---> 49             self._partition_bounds = self.map_partitions(
     50                 lambda s: pd.DataFrame(
     51                     [s.total_bounds], columns=['x0', 'y0', 'x1', 'y1']

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    277         dask.base.compute
    278         """
--> 279         (result,) = compute(self, traverse=False, **kwargs)
    280         return result
    281 

/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    559         postcomputes.append(x.__dask_postcompute__())
    560 
--> 561     results = schedule(dsk, keys, **kwargs)
    562     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    563 

/opt/conda/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2682                     should_rejoin = False
   2683             try:
-> 2684                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2685             finally:
   2686                 for f in futures.values():

/opt/conda/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1991             else:
   1992                 local_worker = None
-> 1993             return self.sync(
   1994                 self._gather,
   1995                 futures,

/opt/conda/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    837             return future
    838         else:
--> 839             return sync(
    840                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    841             )

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1856                             exc = CancelledError(key)
   1857                         else:
-> 1858                             raise exception.with_traceback(traceback)
   1859                         raise exc
   1860                     if errors == "skip":

KilledWorker: ("('from_pandas-9ecbde8f79972641c9091facd27b244c', 30)", <Worker 'tls://10.15.127.8:35903', name: dask-worker-92908e5f7ff74365bb94ce8640647029-jghbz, memory: 0, processing: 2>)

Dask log doesnt tell me much except that the task fails and dask workers get killed; Task ('from_pandas-3cf29940396a335127f696bfff173515', 25) marked as failed because 6 workers died while trying to run it

jbednar commented 3 years ago

Have you tried other Dask jobs with these same workers? From that description it sounds like more of a Dask than a spatialpandas issue, but I could be mistaken.

graphific commented 3 years ago

Yes you're right it seems to be an issue related to a newer version of dask: some other dask computations are not correctly being executed, so I'll close this for now. Tnx anyways :)