dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 716 forks source link

Challenges running xarray wrapped netcdf files #629

Closed pwolfram closed 6 years ago

pwolfram commented 7 years ago

This is a traceback from calling compute on an XArray computation on dask.distributed.

We're able to use dask.array on a NetCDF4 object without locks if our workers have single threads. However, when computing on the .data attribute backed by a NetCDF object wrapped by a few XArray containers we run into the following error. It appears to be coming from computing the shape, which is odd. Traceback below:

cc @mrocklin @shoyer

In [168]: ds = xr.open_mfdataset(fname, lock=False)

In [169]: ds.yParticle.data.sum().compute()
/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/conventions.pyc in shape()
    447     @property
    448     def shape(self):
--> 449         return self.array.shape[:-1]
    450 
    451     def __str__(self):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
mrocklin commented 7 years ago

I worked with @pwolfram to produce that traceback. We were able to get things working if we dove into the .dask graph to get the NetCDF file and then wrap it with a dask.array, something like the following:

xr_var = ds.yParticle.data.dask['some-key']
var = xr_var.array.array.array # unpack

x = da.from_array(var, chunks=(...))
x.sum().compute()  # works ok
shoyer commented 7 years ago

This makes sense now that I'm looking at the full traceback.

This making use of conventions.CharToStringArray, another array-like type that xarray uses for decoding string data, for which we are relying on default serialization (pickle). The netCDF4.Variable get unpickled an no longer points to an open file.

Let's try this again opening the dataset with decode_cf=False.

pwolfram commented 7 years ago

It looks the same.

shoyer commented 7 years ago

Somehow we seem to be using a CharToStringArray....

On Sun, Nov 6, 2016 at 2:33 PM, Phillip Wolfram notifications@github.com wrote:

It looks the same.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/629#issuecomment-258716370, or mute the thread https://github.com/notifications/unsubscribe-auth/ABKS1stI69DmsWS6PvJ_rjuZMoWRurNxks5q7lWngaJpZM4Kqsny .

shoyer commented 7 years ago

What are the types of each level of xr_var.array.array.array?

Also try concat_characters=False

shoyer commented 7 years ago

Also: xr.open_dataset(fname, lock=False, chunks={}) should make a dask array without the mulit-file logic.

pwolfram commented 7 years ago

concat_characters=False is the same error.

pwolfram commented 7 years ago

levels of xr_var.array.array.array:

In [204]: type(xr_var.array)
Out[204]: xarray.core.indexing.LazilyIndexedArray

In [205]: type(xr_var.array.array)
Out[205]: xarray.backends.netCDF4_.NetCDF4ArrayWrapper

In [206]: type(xr_var.array.array.array)
Out[206]: netCDF4._netCDF4.Variable
pwolfram commented 7 years ago

xr.open_dataset(fname, lock=False, chunks={}) is the same error.

shoyer commented 7 years ago

Following your stacktrace down, I see five array types:

LazilyIndexedArray:

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
--> 386         for size, k in zip(self.array.shape, self.key):

CharToStringArray:

/users/pwolfram/lib/python2.7/site-packages/xarray/conventions.pyc in shape()
--> 449         return self.array.shape[:-1]

LazilyIndexedArray:

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
--> 386         for size, k in zip(self.array.shape, self.key):

NetCDF4ArrayWrapper (most likely, definitely a NDArrayMixin subclass):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
--> 409         return self.array.shape

netCDF4.Variable:

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

So it really looks like dask-distributed is choking on CharToStringArray.

In particular, it appears that the original error is from a different array than xr_var. Are you sure you're pulling out the top level key from the dask dict?

To be doubly sure, what is the dtype of ds.yParticle? It is also conceivable (though I think unlikely) that some other array with string data has been pulled into it's dask graph (ds.yParticle.data.dask). Some use of dask.optimize.cull could test that hypothesis.

One other thing to try: at what level of xr_var[.array[.array[.array]]], if any, does the dask array fail to compute?

pwolfram commented 7 years ago

In particular, it appears that the original error is from a different array than xr_var. Are you sure you're pulling out the top level key from the dask dict?

In [214]: ds.yParticle.data.dask
Out[214]: 
{(u'fname:/yParticle-846a0722e86ecac24903e03f48aa35eb',
  0,
  0): (<function dask.array.core.getarray>,
  u'fname:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394a',
  (slice(0, 31, None), slice(0, 1012000, None))),
 u'fname:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394a': LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
 float64 yParticle(Time, nParticles)
 unlimited dimensions: Time
 current shape = (31, 1012000)
 filling off
 ), key=(slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None)))}
In [215]: x = ds.yParticle.data.dask['fname:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394
     ...: a']

So I think this is the top level key from the dask dict, but note that this may not be the case

In [219]: ds.yParticle.data.dask.keys()
Out[219]: 
[(u'filename:/yParticle-846a0722e86ecac24903e03f48aa35eb',
  0,
  0),
 u'filename:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394a']

In [220]: 

In [220]: x = ds.yParticle.data.dask['In [219]: ds.yParticle.data.dask.keys()
  File "<ipython-input-220-f7fc92c41ba8>", line 1
    x = ds.yParticle.data.dask['In [219]: ds.yParticle.data.dask.keys()
                                                                      ^
SyntaxError: EOL while scanning string literal

In [221]: x = ds.yParticle.data.dask['filename:/yParticle-846a0722e86ecac24903e03f48aa35eb']
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-221-94db2c484015> in <module>()
----> 1 x = ds.yParticle.data.dask['filename:/yParticle-846a0722e86ecac24903e03f48aa35eb']

KeyError: 'filename:/yParticle-846a0722e86ecac24903e03f48aa35eb'
pwolfram commented 7 years ago
In [223]: type(ds.yParticle)
Out[223]: xarray.core.dataarray.DataArray
pwolfram commented 7 years ago

@shoyer, can you please clarify:

One other thing to try: at what level of xr_var[.array[.array[.array]]], if any, does the dask array fail to compute?

I don't think this is entirely what you mean:

In [244]: x.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-244-f4c69c9c1276> in <module>()
----> 1 x.compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'compute'

In [245]: x.array.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-245-d906770c195f> in <module>()
----> 1 x.array.compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'compute'

In [246]: x.array.array.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-246-a1bb7b5fa51a> in <module>()
----> 1 x.array.array.compute()

AttributeError: 'NetCDF4ArrayWrapper' object has no attribute 'compute'

In [247]: x.array.array.array.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-247-975c3b146207> in <module>()
----> 1 x.array.array.array.compute()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getattr__ (netCDF4/_netCDF4.c:36798)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.getncattr (netCDF4/_netCDF4.c:34035)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._get_att (netCDF4/_netCDF4.c:4265)()

AttributeError: NetCDF: Attribute not found

or

In [249]: x.array.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-249-63306814fa5b> in <module>()
----> 1 x.array.array.array.sum().compute()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getattr__ (netCDF4/_netCDF4.c:36798)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.getncattr (netCDF4/_netCDF4.c:34035)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._get_att (netCDF4/_netCDF4.c:4265)()

AttributeError: NetCDF: Attribute not found

In [250]: x.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-250-77cf68e730e3> in <module>()
----> 1 x.array.array.sum().compute()

AttributeError: 'NetCDF4ArrayWrapper' object has no attribute 'sum'

In [251]: x.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-251-2714bd5c5439> in <module>()
----> 1 x.array.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

In [252]: x.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-252-7ce0149d8b09> in <module>()
----> 1 x.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

but hopefully this is helpful.

shoyer commented 7 years ago

One other thing to try: at what level of xr_var[.array[.array[.array]]], if any, does the dask array fail to compute? I don't think this is entirely what you mean:

I mean, e.g.,

chunks = 1e7
da.from_array(xr_var, chunks=chunks, dtype=xr_var.dtype).sum().compute()
da.from_array(xr_var.array, chunks=chunks, dtype=xr_var.array.dtype).sum().compute()
da.from_array(xr_var.array.array, chunks=chunks, dtype=xr_var.array.array.dtype).sum().compute()

I can also make CharToStringArray serialized with dask-distributed on my xarray branch (though as I discussed with @mrocklin today, we will want a slightly different solution later).

shoyer commented 7 years ago

Try the latest version of my xarray branch which implements CharToStringArray serialization. If you're still getting the same error, I will be surprised!

pwolfram commented 7 years ago

@shoyer, this is what I get now after updating xarray. It is as you expect-- a different error message:

ds = xr.open_mfdataset(fname, lock=False)
type(ds.yParticle.data)
x = ds.yParticle.data
x.sum().compute()

with output of

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID

I'm also getting this:

In [31]: xr_var = ds.yParticle.data.dask['filename:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-02b72739da348136ce68ab1de5142905']

In [32]: xr_var.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-32-96eb284a6133> in <module>()
----> 1 xr_var.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

In [33]: xr_var.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-33-4dcf0a832ed4> in <module>()
----> 1 xr_var.array.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

In [34]: xr_var.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-34-e041c5367803> in <module>()
----> 1 xr_var.array.array.sum().compute()

AttributeError: 'NetCDF4ArrayWrapper' object has no attribute 'sum'

In [35]: xr_var.array.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-35-a7bbf15b7a38> in <module>()
----> 1 xr_var.array.array.array.sum().compute()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getattr__ (netCDF4/_netCDF4.c:36798)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.getncattr (netCDF4/_netCDF4.c:34035)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._get_att (netCDF4/_netCDF4.c:4265)()

AttributeError: NetCDF: Attribute not found

Also,

In [36]: da.from_array(xr_var, chunks=chunks).sum().compute()
/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
```python

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
In [42]: da.from_array(xr_var.array, chunks=chunks).sum().compute()
distributed.utils - ERROR - NetCDF: Not a valid ID
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 49, in getarray
    c = a[b]
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 398, in __getitem__
    return type(self)(self.array, self._updated_key(key))
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 374, in _updated_key
    new_key = iter(canonicalize_indexer(new_key, self.ndim))
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.py", line 382, in ndim
    return len(self.shape)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 386, in shape
    for size, k in zip(self.array.shape, self.key):
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.py", line 409, in shape
    return self.array.shape
  File "netCDF4/_netCDF4.pyx", line 3378, in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)
  File "netCDF4/_netCDF4.pyx", line 3323, in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)
RuntimeError: NetCDF: Not a valid ID
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-42-2b115a3e59e3> in <module>()
----> 1 da.from_array(xr_var.array, chunks=chunks).sum().compute()

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
     76             Extra keywords to forward to the scheduler ``get`` function.
     77         """
---> 78         return compute(self, **kwargs)[0]
     79 
     80     @classmethod

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    176         dsk = merge(var.dask for var in variables)
    177     keys = [var._keys() for var in variables]
--> 178     results = get(dsk, keys, **kwargs)
    179 
    180     results_iter = iter(results)

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1290 
   1291         try:
-> 1292             results = self.gather(futures)
   1293         except (KeyboardInterrupt, Exception) as e:
   1294             for f in futures.values():

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in gather(self, futures, errors, maxsize)
    881             return (self.gather(f, errors=errors) for f in futures)
    882         else:
--> 883             return sync(self.loop, self._gather, futures, errors=errors)
    884 
    885     @gen.coroutine

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in sync(loop, func, *args, **kwargs)
    132         e.wait(1000000)
    133     if error[0]:
--> 134         six.reraise(type(error[0]), error[0], traceback[0])
    135     else:
    136         return result[0]

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in f()
    118     def f():
    119         try:
--> 120             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    121         except Exception as exc:
    122             logger.exception(exc)

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
In [43]: da.from_array(xr_var.array.array, chunks=chunks).sum().compute()
distributed.utils - ERROR - NetCDF: Not a valid ID
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 49, in getarray
    c = a[b]
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 59, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3671, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)
  File "netCDF4/_netCDF4.pyx", line 3378, in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)
  File "netCDF4/_netCDF4.pyx", line 3323, in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)
RuntimeError: NetCDF: Not a valid ID
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-43-8841ee6791ee> in <module>()
----> 1 da.from_array(xr_var.array.array, chunks=chunks).sum().compute()

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
     76             Extra keywords to forward to the scheduler ``get`` function.
     77         """
---> 78         return compute(self, **kwargs)[0]
     79 
     80     @classmethod

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    176         dsk = merge(var.dask for var in variables)
    177     keys = [var._keys() for var in variables]
--> 178     results = get(dsk, keys, **kwargs)
    179 
    180     results_iter = iter(results)

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1290 
   1291         try:
-> 1292             results = self.gather(futures)
   1293         except (KeyboardInterrupt, Exception) as e:
   1294             for f in futures.values():

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in gather(self, futures, errors, maxsize)
    881             return (self.gather(f, errors=errors) for f in futures)
    882         else:
--> 883             return sync(self.loop, self._gather, futures, errors=errors)
    884 
    885     @gen.coroutine

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in sync(loop, func, *args, **kwargs)
    132         e.wait(1000000)
    133     if error[0]:
--> 134         six.reraise(type(error[0]), error[0], traceback[0])
    135     else:
    136         return result[0]

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in f()
    118     def f():
    119         try:
--> 120             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    121         except Exception as exc:
    122             logger.exception(exc)

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.pyc in __getitem__()
     57 
     58         try:
---> 59             data = getitem(self.array, key)
     60         except IndexError:
     61             # Catch IndexError in netCDF4 and return a more informative error

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
In [44]: da.from_array(xr_var.array.array.array, chunks=chunks).sum().compute()
distributed.utils - ERROR - NetCDF: Not a valid ID
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 49, in getarray
    c = a[b]
  File "netCDF4/_netCDF4.pyx", line 3671, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)
  File "netCDF4/_netCDF4.pyx", line 3378, in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)
  File "netCDF4/_netCDF4.pyx", line 3323, in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)
RuntimeError: NetCDF: Not a valid ID
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-44-1cc685a6c727> in <module>()
----> 1 da.from_array(xr_var.array.array.array, chunks=chunks).sum().compute()

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
     76             Extra keywords to forward to the scheduler ``get`` function.
     77         """
---> 78         return compute(self, **kwargs)[0]
     79 
     80     @classmethod

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    176         dsk = merge(var.dask for var in variables)
    177     keys = [var._keys() for var in variables]
--> 178     results = get(dsk, keys, **kwargs)
    179 
    180     results_iter = iter(results)

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1290 
   1291         try:
-> 1292             results = self.gather(futures)
   1293         except (KeyboardInterrupt, Exception) as e:
   1294             for f in futures.values():

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in gather(self, futures, errors, maxsize)
    881             return (self.gather(f, errors=errors) for f in futures)
    882         else:
--> 883             return sync(self.loop, self._gather, futures, errors=errors)
    884 
    885     @gen.coroutine

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in sync(loop, func, *args, **kwargs)
    132         e.wait(1000000)
    133     if error[0]:
--> 134         six.reraise(type(error[0]), error[0], traceback[0])
    135     else:
    136         return result[0]

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in f()
    118     def f():
    119         try:
--> 120             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    121         except Exception as exc:
    122             logger.exception(exc)

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
pwolfram commented 7 years ago

@shoyer, I may be more useful here too if you provide some additional guidance on the debug strategy here but am of course happy to keep trying things.

shoyer commented 7 years ago

I don't understand what is going on here. It seems dask is maybe not (de)serializing the netCDF4 variables correctly.

I put up an alternative xarray hack in https://github.com/pydata/xarray/pull/1095 that passes a more extensive integration test, so that might be worth a try.

pwolfram commented 7 years ago

@shoyer and @mrocklin, this looks like it is working now using pydata/xarray#1095:

In [1]: from dask.distributed import Client

In [2]: client = Client('wf609:8786')

In [3]: client
Out[3]: <Client: scheduler="wf609:8786" processes=2 cores=32>

In [5]: import dask.array as da

In [6]: import xarray as xr

In [7]: ds = xr.open_mfdataset('fname', lock=False)

In [8]: x = ds.yParticle.data

In [9]: x.sum().compute()
Out[9]: 31347046718055.527

In [10]: ds = xr.open_mfdataset('./lagrPartTrack.*.nc', lock=False)

In [11]: x = ds.yParticle.data

In [12]: x.sum().compute()
Out[12]: 525875176622133.69

Would this naturally suggest that xarray-distributed is now a reality? If so, I should try something more complex when I get the time tomorrow.

pwolfram commented 7 years ago

@shoyer, note it is possible I'm not correctly testing so if you want me to run a particular scenario again please let me know so that we can double-check that it is working.

mrocklin commented 7 years ago

This looks pretty exciting to me :)

shoyer commented 7 years ago

Glad that worked! I was getting worried there.

Does ds.yParticle.sum() or ds.sum() work? That would be closer to the usual xarray workflow.

pwolfram commented 7 years ago

HPC allocations are being transitioned-- sorry about the delay. Hopefully I'll be able to verify this later today or tomorrow.

pwolfram commented 7 years ago

@shoyer, HPC is back up and both appear to work although we get a memory error for the dataset-based sum.

In [5]: ds = xr.open_mfdataset(filenames, lock=False)
client

In [6]: client
Out[6]: <Client: scheduler="wf332:8786" processes=2 cores=32>

In [7]: ds.yParticle.sum()
Out[7]: 
<xarray.DataArray 'yParticle' ()>
dask.array<sum-agg..., shape=(), dtype=float64, chunksize=()>

In [8]: ds.sum()
Out[8]: distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 51, in getarray
    c = np.asarray(c)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 73, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3695, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37923)
  File "netCDF4/_netCDF4.pyx", line 4363, in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:46946)
MemoryError
mrocklin commented 7 years ago

It's odd to get a memory error on sum regardless. Any feedback from the diagnostic page? You might also want to check out localhost:8787/workers to get per-worker information.

pwolfram commented 7 years ago

Diagnostics page-- do you mean http://localhost:8787/status? I didn't see anything but the internet here is really bad so it is possible I've missed something.

mrocklin commented 7 years ago

Check out http://localhost:8787/workers

On Thu, Nov 10, 2016 at 1:15 PM, Phillip Wolfram notifications@github.com wrote:

Diagnostics page-- do you mean http://localhost:8787/status? I didn't see anything but the internet here is really bad so it is possible I've missed something.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/629#issuecomment-259765191, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszI589svP8T-VcjqPRn0yatFDsxA5ks5q8184gaJpZM4Kqsny .

pwolfram commented 7 years ago

It was failing on uVertexVelocity:

distributed.worker - WARNING -  Compute Failed
Function: execute_task
args:     ((<built-in function apply>, <functools.partial object at 0x2b5975418368>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
float64 uVertexVelocity(Time, nVertices, nVertLevels)
unlimited dimensions: Time
current shape = (30, 184400, 100)
filling off
), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 30, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)}))
kwargs:   {}
None
distributed.worker - WARNING -  Compute Failed
Function: execute_task
args:     ((<built-in function apply>, <functools.partial object at 0x2b59b042c4c8>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
float64 uVertexVelocity(Time, nVertices, nVertLevels)
unlimited dimensions: Time
current shape = (31, 184400, 100)
filling off
), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 31, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)}))
kwargs:   {}
None
distributed.worker - INFO - Deleted 15 keys

but works for just that field:

In [9]: ds.uVertexVelocity.sum()
Out[9]: 
<xarray.DataArray 'uVertexVelocity' ()>
dask.array<sum-agg..., shape=(), dtype=float64, chunksize=()>
mrocklin commented 7 years ago

If you upgrade to distributed master the workers will be a bit more pragmatic about defending memory by using disk. You probably shouldn't be running into these issues regardless if you're just computing sums, but it's something to try.

pip install git+https://github.com/dask/distributed.git --upgrade

On Thu, Nov 10, 2016 at 1:22 PM, Phillip Wolfram notifications@github.com wrote:

It was failing on uVertexVelocity:

distributed.worker - WARNING - Compute Failed Function: execute_task args: ((, <functools.partial object at 0x2b5975418368>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'> float64 uVertexVelocity(Time, nVertices, nVertLevels) unlimited dimensions: Time current shape = (30, 184400, 100) filling off ), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 30, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)})) kwargs: {}None distributed.worker - WARNING - Compute Failed Function: execute_task args: ((, <functools.partial object at 0x2b59b042c4c8>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'> float64 uVertexVelocity(Time, nVertices, nVertLevels) unlimited dimensions: Time current shape = (31, 184400, 100) filling off ), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 31, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)})) kwargs: {}None distributed.worker - INFO - Deleted 15 keys

but works for just that field:

In [9]: ds.uVertexVelocity.sum() Out[9]: <xarray.DataArray 'uVertexVelocity' ()> dask.array<sum-agg..., shape=(), dtype=float64, chunksize=()>

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/629#issuecomment-259766843, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszKVkk2qgU0KaHeSM_Orc5_J1i1Cfks5q82DdgaJpZM4Kqsny .

pwolfram commented 7 years ago

The http://localhost:8787/workers didn't reveal anything too special but this could be because of the high internet latency on my connection. 

pwolfram commented 7 years ago

Additional thoughts on this issue?

pwolfram commented 7 years ago

This field is 75.1 GB so it is possible we are overshooting the 2 X 64GB RAM of the distributed cluster. This is probably what is happening. I'll try again with another 2 nodes.

mrocklin commented 7 years ago

That page should show the memory consumption by each process.

Another thing you could do is try to replicate the computation using only dask.array. This would mean something like

variables = [ds.Var.data, ds.Var2.data, ...] dask.compute(*[v.sum() for v in variables])

This would help to isolate the issue between dask.array/distributed and xarray

On Thu, Nov 10, 2016 at 1:32 PM, Phillip Wolfram notifications@github.com wrote:

Additional thoughts on this issue?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/629#issuecomment-259769422, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszG06aCDyPuLLa9ks2L2LYcwLS1Onks5q82NQgaJpZM4Kqsny .

mrocklin commented 7 years ago

The normal dask tricks of streaming computations through memory should still be working here. Also, we should be spilling to disk to avoid MemoryErrors.

On Thu, Nov 10, 2016 at 1:37 PM, Matthew Rocklin mrocklin@continuum.io wrote:

That page should show the memory consumption by each process.

Another thing you could do is try to replicate the computation using only dask.array. This would mean something like

variables = [ds.Var.data, ds.Var2.data, ...] dask.compute(*[v.sum() for v in variables])

This would help to isolate the issue between dask.array/distributed and xarray

On Thu, Nov 10, 2016 at 1:32 PM, Phillip Wolfram <notifications@github.com

wrote:

Additional thoughts on this issue?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/629#issuecomment-259769422, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszG06aCDyPuLLa9ks2L2LYcwLS1Onks5q82NQgaJpZM4Kqsny .

pwolfram commented 7 years ago

It worked with 4 nodes, so we were running out of memory on-node.

pwolfram commented 7 years ago

I'm upgrading distributed and will try again on 2 nodes

mrocklin commented 7 years ago

That is a little bit surprising.

On Thu, Nov 10, 2016 at 1:42 PM, Phillip Wolfram notifications@github.com wrote:

It worked with 4 nodes, so we were running out of memory on-node.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/629#issuecomment-259771963, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszDT3LtnYeTiWcIbPUa0bY45KI31iks5q82WegaJpZM4Kqsny .

pwolfram commented 7 years ago

Interesting, I'm getting a new error: turns out I just needed to reimport distributed.

pwolfram commented 7 years ago

Forget the last comment-- restart of distributed fixed it.

shoyer commented 7 years ago

Dataset.sum() only sums variables individually -- it's a pretty shallow wrapper around da.sum.

pwolfram commented 7 years ago

I'm still getting the memory error on two nodes, however:

In [1]: from dask.distributed import Client

In [2]: client = Client('host:8786')

In [3]: import xarray as xr

In [4]: ds = xr.open_mfdataset('/net/scratch3/pwolfram/ZISO_5km/realizations/realization_24-01/analysis_members/lagrPartTrack.*.nc', lock=False)
ds.sum()
In [5]: ds.sum()
Out[5]: distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 51, in getarray
    c = np.asarray(c)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 73, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3695, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37923)
  File "netCDF4/_netCDF4.pyx", line 4363, in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:46946)
MemoryError
---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
pwolfram commented 7 years ago

The issue appears to be on the dask side, assuming I've set this up right:

In [28]: ds = xr.open_mfdataset(fnames, lock=False)

In [29]: x = ds.uVertexVelocity.data.sum()

In [30]: x.compute()
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 51, in getarray
    c = np.asarray(c)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 73, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3695, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37923)
  File "netCDF4/_netCDF4.pyx", line 4363, in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:46946)
MemoryError
pwolfram commented 7 years ago

Note,

In [31]: ds.uVertexVelocity.nbytes*2**-30
Out[31]: 75.15162229537964

and this is 2 nodes, 64GB each.

shoyer commented 7 years ago

There is still the issue of too many open files, but as of https://github.com/pydata/xarray/pull/1128 xarray data stores are pickleable, which will enable their use with dask-distributed.

pwolfram commented 7 years ago

@shoyer and @mrocklin, is this an issue we can close after https://github.com/pydata/xarray/pull/1198 is merged? The primary reason we opened this issue was to be a placeholder to work through issues related to dask.distributed and xarray integration.

pwolfram commented 6 years ago

@mrocklin, can we close this issue?

mrocklin commented 6 years ago

Sure.

edougherty32 commented 6 years ago

Hi, I'm having the same issue in receiving the error message:

RuntimeError: NetCDF: Not a valid ID

When trying to get values from a dask array after performing a computation. Though I see this issue was resolved, using #https://github.com/pydata/xarray/pull/1095, I don't see the explicit solution.

Could you please redirect me to this solution? Thanks!