dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 715 forks source link

AttributeError: 'ZarrStore' object has no attribute '_append_dim' #4419

Open ghislainp opened 3 years ago

ghislainp commented 3 years ago

My code is like this:

def compute_a_year(year, data=data, datatype=datatype):
            return map_decrease_rate(data=data, datatype=datatype, year=year)

def lmap(f, x):
    futures = dask_client.map(f, x)
    return dask_client.gather(futures)

return xr.concat(lmap(compute_a_year, years),
                         dim=xr.DataArray(years, dims='year', name='year'))

map_decrease_rate is a function that make a lengthy calculation.

I get the following error raised by dask_client.gather

AttributeError: 'ZarrStore' object has no attribute '_append_dim'

No calculation is performed, the error seems to be raised after moving the data, before the calculation is started.

the full traceback is:

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1989                 direct=direct,
   1990                 local_worker=local_worker,
-> 1991                 asynchronous=asynchronous,
   1992             )
   1993 

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    830         else:
    831             return sync(
--> 832                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    833             )
    834 

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1848                             exc = CancelledError(key)
   1849                         else:
-> 1850                             raise exception.with_traceback(traceback)
   1851                         raise exc
   1852                     if errors == "skip":

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/utils.py in offload()

~/.conda/envs/rsrt/lib/python3.7/concurrent/futures/thread.py in run()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/utils.py in <lambda>()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/worker.py in _deserialize()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/worker.py in loads_function()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/protocol/pickle.py in loads()

What happened:

AttributeError

What you expected to happen:

the calculation should stard in the "web status page", but no, it does not.

Minimal Complete Verifiable Example:

I don't have one, I'm not able to isolate the problem.

Anything else we need to know?:

Environment:

jrbourbeau commented 3 years ago

Thanks for raising an issue @ghislainp. Are you able to post the full traceback? It looks like it might be cut off in your original post.

ghislainp commented 3 years ago

The traceback is below. A bit more context:

+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| blosc   | 1.10.1        | None          | None          |
| msgpack | 1.0.2         | 1.0.0         | 1.0.0         |
| numpy   | 1.19.4        | 1.18.1        | 1.18.1        |
| python  | 3.7.9.final.0 | 3.7.7.final.0 | 3.7.7.final.0 |
+---------+---------------+---------------+---------------+
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-5-a063b36e8bca> in <module>
----> 1 rate = myprocessing.map_decrease_rate(sigma, datatype='sigma', dask_client=runner)
      2 xr.Dataset({'rate': rate}).to_netcdf(f"rate-{sensor}-ransac.nc")

~/data/myprocessing/myprocessing.py in map_decrease_rate(data, datatype, tb19, year, dask_client)
     51                 return dask_client.gather(futures)
     52 
---> 53         return xr.concat(lmap(compute_a_year, years),
     54                          dim=xr.DataArray(years, dims='year', name='year'))
     55 

~/data/myprocessing/myprocessing.py in lmap(f, x)
     49             def lmap(f, x):
     50                 futures = dask_client.map(f, x)
---> 51                 return dask_client.gather(futures)
     52 
     53         return xr.concat(lmap(compute_a_year, years),

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1989                 direct=direct,
   1990                 local_worker=local_worker,
-> 1991                 asynchronous=asynchronous,
   1992             )
   1993 

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    830         else:
    831             return sync(
--> 832                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    833             )
    834 

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1848                             exc = CancelledError(key)
   1849                         else:
-> 1850                             raise exception.with_traceback(traceback)
   1851                         raise exc
   1852                     if errors == "skip":

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/utils.py in offload()

~/.conda/envs/rsrt/lib/python3.7/concurrent/futures/thread.py in run()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/utils.py in <lambda>()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/worker.py in _deserialize()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/worker.py in loads_function()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/protocol/pickle.py in loads()

AttributeError: 'ZarrStore' object has no attribute '_append_dim'
jrbourbeau commented 3 years ago

It looks like Dask is having trouble deserializing an Xarray object. Taking a quick look through the Xarray source, it looks like _append_dim was recently added to ZarrStore. Could you let me know which version of Xarray you're using on the client, scheduler, and workers? One convenient way to get this information is with client.get_versions(packages=["xarray"])

ghislainp commented 3 years ago

See below the versions. It turns out that I managed to go a step further:

Maybe a problem with loading the data. On my local computer I'm using intake, but I've not installed intake on my cluster (and I don't have the catalog anyway).

{'scheduler': {'host': {'python': '3.7.9.final.0',
   'python-bits': 64,
   'OS': 'Linux',
   'OS-release': '4.9.0-13-amd64',
   'machine': 'x86_64',
   'processor': '',
   'byteorder': 'little',
   'LC_ALL': 'None',
   'LANG': 'None'},
  'packages': {'python': '3.7.9.final.0',
   'dask': '2020.12.0',
   'distributed': '2020.12.0',
   'msgpack': '1.0.0',
   'cloudpickle': '1.6.0',
   'tornado': '6.1',
   'toolz': '0.11.1',
   'numpy': '1.18.1',
   'lz4': None,
   'blosc': '1.10.1',
   'xarray': '0.16.2'}},
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-24-a063b36e8bca> in <module>
----> 1 rate = myprocessing.map_decrease_rate(sigma, datatype='sigma', dask_client=runner)
      2 xr.Dataset({'rate': rate}).to_netcdf(f"rate-{sensor}-ransac.nc")

~/data/Myprocessing/myprocessing/myprocessing.py in map_decrease_rate(data, datatype, tb19, year, dask_client)
     51                 return dask_client.gather(futures)
     52 
---> 53         return xr.concat(lmap(compute_a_year, years),
     54                          dim=xr.DataArray(years, dims='year', name='year'))
     55 

~/data/Myprocessing/myprocessing/myprocessing.py in lmap(f, x)
     49             def lmap(f, x):
     50                 futures = dask_client.map(f, x, batch_size=1)
---> 51                 return dask_client.gather(futures)
     52 
     53         return xr.concat(lmap(compute_a_year, years),

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1989                 direct=direct,
   1990                 local_worker=local_worker,
-> 1991                 asynchronous=asynchronous,
   1992             )
   1993 

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    830         else:
    831             return sync(
--> 832                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    833             )
    834 

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/anaconda3/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1848                             exc = CancelledError(key)
   1849                         else:
-> 1850                             raise exception.with_traceback(traceback)
   1851                         raise exc
   1852                     if errors == "skip":

~/data/Myprocessing/myprocessing/myprocessing.py in compute_a_year()
     34 
     35         def compute_a_year(year, data=data, datatype=datatype):
---> 36             return map_decrease_rate(data=data, datatype=datatype, year=year)
     37 
     38         if dask_client is None:

~/RSRT/dask-worker-space/dask-worker-space/worker-ar99rzzl/myprocessing.zip/myprocessing/myprocessing.py in map_decrease_rate()

~/RSRT/dask-worker-space/dask-worker-space/worker-ar99rzzl/myprocessing.zip/myprocessing/myprocessing.py in apply()

~/RSRT/dask-worker-space/dask-worker-space/worker-ar99rzzl/myprocessing.zip/myprocessing/myprocessing.py in <listcomp>()

~/RSRT/dask-worker-space/dask-worker-space/worker-ar99rzzl/myprocessing.zip/myprocessing/myprocessing.py in <listcomp>()

~/RSRT/dask-worker-space/dask-worker-space/worker-ar99rzzl/myprocessing.zip/myprocessing/myprocessing.py in compute_rate()

~/RSRT/dask-worker-space/dask-worker-space/worker-ar99rzzl/myprocessing.zip/myprocessing/myprocessing.py in compute_rate_ransac()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/dataarray.py in dropna()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/dataset.py in dropna()

~/.conda/envs/rsrt/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/common.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/variable.py in values()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/variable.py in _as_array_or_item()

~/.conda/envs/rsrt/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray()

~/.conda/envs/rsrt/lib/python3.7/site-packages/dask/array/core.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/dask/base.py in compute()

~/.conda/envs/rsrt/lib/python3.7/site-packages/dask/base.py in compute()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/client.py in get()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/client.py in gather()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/client.py in sync()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/utils.py in sync()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/utils.py in f()

~/.conda/envs/rsrt/lib/python3.7/site-packages/tornado/gen.py in run()

~/.conda/envs/rsrt/lib/python3.7/site-packages/distributed/client.py in _gather()

~/.conda/envs/rsrt/lib/python3.7/site-packages/dask/array/core.py in getter()

~/.conda/envs/rsrt/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/indexing.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/indexing.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/indexing.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/coding/variables.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/coding/variables.py in _scale_offset_decoding()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/coding/variables.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/coding/variables.py in _apply_mask()

~/.conda/envs/rsrt/lib/python3.7/site-packages/numpy/core/_asarray.py in asarray()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/core/indexing.py in __array__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/xarray/backends/zarr.py in __getitem__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/zarr/core.py in __getitem__()

~/.conda/envs/rsrt/lib/python3.7/site-packages/zarr/core.py in get_basic_selection()

~/.conda/envs/rsrt/lib/python3.7/site-packages/zarr/core.py in _get_basic_selection_nd()

~/.conda/envs/rsrt/lib/python3.7/site-packages/zarr/core.py in _get_selection()

~/.conda/envs/rsrt/lib/python3.7/site-packages/zarr/core.py in _chunk_getitems()

~/.conda/envs/rsrt/lib/python3.7/site-packages/fsspec/mapping.py in getitems()

~/.conda/envs/rsrt/lib/python3.7/site-packages/fsspec/mapping.py in <dictcomp>()

AttributeError: 'FSMap' object has no attribute 'missing_exceptions'
jacobtomlinson commented 3 years ago

I hope you don't mind I've just had a quick pass through this issue and added code fences and highlighting to make things a little more readable.

It looks like parts of the original comment had also been placed within a <!-- HTML comment --> block, which is why bits appeared to be missing.

jrbourbeau commented 3 years ago

AttributeError: 'FSMap' object has no attribute 'missing_exceptions'

@ghislainp a good next step would be to use client.get_versions(check=True, packages=["fsspec"]) to confirm that you're using the same fsspec version on the client, scheduler, and workers.

Additionally if you're able to post a minimal example (see https://blog.dask.org/2018/02/28/minimal-bug-reports) that would help go a long way towards debugging

mrocklin commented 3 years ago

This might be the kind of issue that @martindurant is singularly effective at solving (due to both fsspec and zarr authorship)

On Wed, Jan 13, 2021 at 7:16 AM James Bourbeau notifications@github.com wrote:

AttributeError: 'FSMap' object has no attribute 'missing_exceptions'

@ghislainp https://github.com/ghislainp a good next step would be to use client.get_versions(check=True, packages=["fsspec"]) to confirm that you're using the same fsspec version on the client, scheduler, and workers.

Additionally if you're able to post a minimal example (see https://blog.dask.org/2018/02/28/minimal-bug-reports) that would help go a long way towards debugging

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4419#issuecomment-759514870, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTDVXFRLFERR4NOEIULSZW2NJANCNFSM4V7DKJGQ .

martindurant commented 3 years ago

Yep, just like you found a new/changed attribute in xarray, you need to have fsspec in sync too. The attribute appeared since https://github.com/intake/filesystem_spec/pull/409 (which would make it v>=0.8.3), but I think zarr only made use of it more recently - would have to check exactly when.

ghislainp commented 3 years ago

Good point. Lesson learned, I must always check the environments are synchronous. Do you know a tool for that ?

I'm progressing but still have difficulties (sorry not to open a new issue but I don't really know how to get help for this beginner questions). The following code works but takes much much more time on the cluster (with 32 cores and workers) than on my laptop. The zarr file is "only" 6.4Gb. Am I doing something wrong ? What is surprising is that on the dashboard, there is long period with nothing happening, like if there was long transfer of data. Nothing in the console where the workers are runned. Is there a way to display what and how much dask is transferring ? or has transferred ?

data = dask.delayed(xr.open_zarr)("files_on_the_cluster.zarr", chunks='auto') sigma = data['sigma']

mask = np.array(....) # ~5Mb array loaded on my laptop disk mask = xr.DataArray(mask, coords=(sigma.y.compute(), sigma.x.compute())) mask = runner.scatter(mask) print("apply mask") sigma *= mask

nbyear = np.unique(sigma.time.dt.year.compute()).size print("nbyear=", nbyear) specialzone = ((sigma < -15).sum(dim='time') > 0.5 nbyear) ((sigma >-10).sum(dim='time') > 200 nbyear) sigma = specialzone

print(int((specialzone*mask).sum().compute()))

And then, the following line takes 10 minutes while on my laptop it takes less than a minute. sigma.mean(dim='time').compute()

As you can guess I'm at home, with a 10Mb internet... bot enough to transfer Gb but enough to transfer 5Mb (the array I'm reading locally) and the graph ?

Thank you in advance for your help

martindurant commented 3 years ago

That's quite a few lines to try to unpack, but the very first

dask.delayed(xr.open_zarr)("files_on_the_cluster.zarr", chunks='auto')

is fishy: xr.open_zarr will give you a dask-backed data structure without delayed, so long as you request chunks. The xarray examples do not use delayed.

As for the rest, you should check your graph size, and of course the IO will depend very much on what "on the cluster" means.

ghislainp commented 3 years ago

I have remove dask.delayed and I have measured the time. The line: print(int((specialzone*mask).sum().compute())) takes 33s on my laptop and 190s on the cluster (i.e. from my laptop when using dask.distributed)

I don't know how to measure graph size, but the following: len(pickle.dumps(sigma.data.dask.keyset())) gives 1.9Mb, which should reach the cluster in a few seconds from home.

The data to transfer are the 'mask' which is 5Mb.

The big zarr data is on the cluster (and on my laptop), in the same directory on both machines. It is not clear to me how dask determines that the file on the cluster should be used instead of the file on my machine. Is there some magic ? In any case, it is certain that the whole data are not transferred, otherwise it would take much much more than 190s. But still 190s is much more than 33s.

My problem is that the remaining of the processing (code not show) is much more expensive than 33s, and I really need the power of the cluster. I may give up and go for running directly the whole code on the cluster with joblib, but this is not so nice.