pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.62k stars 1.08k forks source link

maximum recursion with dask and pydap backend #4348

Open jhamman opened 4 years ago

jhamman commented 4 years ago

What happened:

I'm getting a maximum recursion error when using the Pydap backend with Dask distributed. It seems the we're failing to successfully pickle the pydap backend store.

What you expected to happen:

Successful parallel loading of opendap dataset.

Minimal Complete Verifiable Example:

import xarray as xr
from dask.distributed import Client

client = Client()

ds = xr.open_dataset('http://thredds.northwestknowledge.net:8080/thredds/dodsC/agg_terraclimate_pet_1958_CurrentYear_GLOBE.nc',
                     engine='pydap', chunks={'lat': 1024, 'lon': 1024, 'time': 12}).load()

yields:

Killed worker on the client:

--------------------------------------------------------------------------- KilledWorker Traceback (most recent call last) in 4 client = Client() 5 ----> 6 ds = xr.open_dataset('http://thredds.northwestknowledge.net:8080/thredds/dodsC/agg_terraclimate_pet_1958_CurrentYear_GLOBE.nc', 7 engine='pydap', chunks={'lat': 1024, 'lon': 1024, 'time': 12}).load() ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/xarray/core/dataset.py in load(self, **kwargs) 652 653 # evaluate all the dask arrays simultaneously --> 654 evaluated_data = da.compute(*lazy_data.values(), **kwargs) 655 656 for k, data in zip(lazy_data, evaluated_data): ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs) 435 keys = [x.__dask_keys__() for x in collections] 436 postcomputes = [x.__dask_postcompute__() for x in collections] --> 437 results = schedule(dsk, keys, **kwargs) 438 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 439 ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 2594 should_rejoin = False 2595 try: -> 2596 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 2597 finally: 2598 for f in futures.values(): ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous) 1886 else: 1887 local_worker = None -> 1888 return self.sync( 1889 self._gather, 1890 futures, ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 775 return future 776 else: --> 777 return sync( 778 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 779 ) ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs) 346 if error[0]: 347 typ, exc, tb = error[0] --> 348 raise exc.with_traceback(tb) 349 else: 350 return result[0] ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/utils.py in f() 330 if callback_timeout is not None: 331 future = asyncio.wait_for(future, callback_timeout) --> 332 result[0] = yield future 333 except Exception as exc: 334 error[0] = sys.exc_info() ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/tornado/gen.py in run(self) 733 734 try: --> 735 value = future.result() 736 except Exception: 737 exc_info = sys.exc_info() ~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 1751 exc = CancelledError(key) 1752 else: -> 1753 raise exception.with_traceback(traceback) 1754 raise exc 1755 if errors == "skip": KilledWorker: ('open_dataset-54c87cd25bf4e9df37cb3030e6602974pet-d39db76f8636f3803611948183e52c13', )

and the above mentioned recursion error on the workers:

distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Registered to: tcp://127.0.0.1:57334 distributed.worker - INFO - ------------------------------------------------- distributed.worker - ERROR - maximum recursion depth exceeded Traceback (most recent call last): File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/worker.py", line 931, in handle_scheduler await self.handle_stream( File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/core.py", line 455, in handle_stream msgs = await comm.read() File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/comm/tcp.py", line 211, in read msg = await from_frames( File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/comm/utils.py", line 75, in from_frames res = _from_frames() File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/comm/utils.py", line 60, in _from_frames return protocol.loads( File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/core.py", line 130, in loads value = _deserialize(head, fs, deserializers=deserializers) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 269, in deserialize return loads(header, frames) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 59, in pickle_loads return pickle.loads(b"".join(frames)) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 59, in loads return pickle.loads(x) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/pydap/model.py", line 235, in __getattr__ return self.attributes[attr] File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/pydap/model.py", line 235, in __getattr__ return self.attributes[attr] File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/pydap/model.py", line 235, in __getattr__ return self.attributes[attr] [Previous line repeated 973 more times] RecursionError: maximum recursion depth exceeded distributed.worker - INFO - Connection to scheduler broken. Reconnecting...

Anything else we need to know?:

I've found this to be reproducible with a few kinds of Dask clusters. Setting Client(processes=False) does correct the problem at the expense of multiprocessiing.

Environment:

Output of xr.show_versions() INSTALLED VERSIONS ------------------ commit: None python: 3.8.2 | packaged by conda-forge | (default, Mar 5 2020, 16:54:44) [Clang 9.0.1 ] python-bits: 64 OS: Darwin OS-release: 19.5.0 machine: x86_64 processor: i386 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: en_US.UTF-8 libhdf5: 1.10.5 libnetcdf: 4.7.3 xarray: 0.15.1 pandas: 1.0.3 numpy: 1.18.1 scipy: 1.4.1 netCDF4: 1.5.3 pydap: installed h5netcdf: 0.8.0 h5py: 2.10.0 Nio: None zarr: 2.4.0 cftime: 1.1.1.2 nc_time_axis: 1.2.0 PseudoNetCDF: None rasterio: 1.0.28 cfgrib: None iris: None bottleneck: 1.3.2 dask: 2.13.0 distributed: 2.13.0 matplotlib: 3.2.1 cartopy: 0.17.0 seaborn: 0.10.0 numbagg: installed setuptools: 46.1.3.post20200325 pip: 20.0.2 conda: installed pytest: 5.4.1 IPython: 7.13.0 sphinx: 3.1.1
stale[bot] commented 2 years ago

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

aulemahal commented 1 year ago

This is happenning to me too. For the record, I need to use engine='pydap' because it supports authentification to access private thredds data programmatically (by passing session), while netCDF4 does not.

I pointed to this issue in the related (but currently badly named and stale) issue upstream.