pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

Example of netcdf => xarray => rechunker => zarr? #59

Closed rsignell-usgs closed 3 years ago

rsignell-usgs commented 3 years ago

I saw that there was experimental support for xarray datasets, so I tried this hopefully reproducible notebook that seeks to read from OPeNDAP and write rechunked zarr:

https://gist.github.com/7f3a4367224aeaee227bdfa90620ed96

The code:

max_mem = '4GB'
array_plan = rechunk(ds, chunk_plan, max_mem, target, temp_store=temp)

results in this error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-11-969522ca9241> in <module>
      1 max_mem = '4GB'
----> 2 array_plan = rechunk(ds, chunk_plan, max_mem, target, temp_store=temp)

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, executor)
    301         target_options=target_options,
    302         temp_store=temp_store,
--> 303         temp_options=temp_options,
    304     )
    305     plan = executor.prepare_plan(copy_spec)

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options)
    381                 temp_store_or_group=temp_group,
    382                 temp_options=options,
--> 383                 name=name,
    384             )
    385             copy_spec.write.array.attrs.update(variable_attrs)  # type: ignore

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    464 
    465     if isinstance(target_chunks, dict):
--> 466         array_dims = _get_dims_from_zarr_array(source_array)
    467         try:
    468             target_chunks = _shape_dict_to_tuple(array_dims, target_chunks)

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in _get_dims_from_zarr_array(z_array)
    138     # use Xarray convention
    139     # http://xarray.pydata.org/en/stable/internals.html#zarr-encoding-specification
--> 140     return z_array.attrs["_ARRAY_DIMENSIONS"]
    141 
    142 

AttributeError: 'Array' object has no attribute 'attrs'

Is it expected that rechunker can be used this way with xarray src data?

If not, does someone have an example of using rechunker to write zarr from xarray src data?

rsignell-usgs commented 3 years ago

@eric-czech , do you by chance have an example notebook using an xarray dataset as the source?

rabernat commented 3 years ago

Rich, this test might be a good place to start:

https://github.com/pangeo-data/rechunker/blob/9f49f96004c85b2cb54255d42430d70e25686071/tests/test_rechunk.py#L306

tomwhite commented 3 years ago

I think this is happening because targets chunk values need to be tuples (of ints), not dicts.

So in this test, target_chunks is {'a': (20, 10)}. It fails if it is {'a': {'x': 20, 'y': 10}} (where a is the variable, and x and y are dimensions).

I think this is just a current limitation in xarray support in rechunker - it should be possible to use the dictionary syntax. (And indeed that would probably be preferable since it would avoid dimension ordering issues.)

BTW here is an example (not a notebook) of where Eric uses xarray rechunking: https://github.com/pystatgen/sgkit/blob/master/sgkit/io/bgen/bgen_reader.py#L510. Note that it too uses tuples.

rabernat commented 3 years ago

I think this is happening because targets chunk values need to be tuples (of ints), not dicts.

So we have definitely implemented this for Zarr arrays, here:

https://github.com/pangeo-data/rechunker/blob/9f49f96004c85b2cb54255d42430d70e25686071/rechunker/api.py#L465-L466

https://github.com/pangeo-data/rechunker/blob/9f49f96004c85b2cb54255d42430d70e25686071/rechunker/api.py#L137-L140

For xarray datasets, you would just need to translate the dict syntax to a tuple. This could happen right here: https://github.com/pangeo-data/rechunker/blob/9f49f96004c85b2cb54255d42430d70e25686071/rechunker/api.py#L363-L364

eric-czech commented 3 years ago

Sorry for the delay @rsignell-usgs but what Tom pointed out are the best examples I have (thank you @tomwhite!). I would like to eventually add some though, or contribute to them, as a part of https://github.com/pangeo-data/rechunker/issues/53.

rsignell-usgs commented 3 years ago

I'm still struggling with this. I've got a workflow to process National Water Model netcdf files on s3 to Xarray, then use rechunker, but I'm getting this error when I call rechunk:

ContainsArrayError: path 'time' contains an array

I don't have a minimal example, but I do have a reproducible notebook in case it's a simple thing I'm doing wrong and it's clear what's going on.

@rabernat, you mentioned you don't have an AWS account, so in case you want to try running it, I added you as a user on https://jupyter.qhub.earthmap.us. You should be able to login with GitHub crednetials, go to the /shared/users/rsignell/notebooks/NWM folder and run the nwm_dask notebook.

rabernat commented 3 years ago

That error usually means that you have not properly cleared the target locations. Can you try explicitly deleting the temp store and target store (e.g. using the CLI) before running your notebook?

I think that rechunker should do this automatically, but for now this would be a useful debugging step.

rsignell-usgs commented 3 years ago

Okay @rabernat , I'll definitely try that.
(Also, would it be better to have the temp storage locally instead on S3 since the chunks I guess will be tiny?)

rabernat commented 3 years ago

(Also, would it be better to have the temp storage locally instead on S3 since the chunks I guess will be tiny?)

I'm not sure. There are a lot of factors. Try it out! Now that async works in zarr the overhead of creating many small chunks should be much lower. However, you need the unreleased master version of zarr to get these benefits.

A problem we don't know how to solve is how to delete the intermediate store efficiently when rechunker is done.

rsignell-usgs commented 3 years ago

Okay, writing to brand new objects, I now get a different error:

ValueError: A temporary store location must be provided (array=streamflow).
rabernat commented 3 years ago

Now that error sounds familiar. I think I got the same thing when I tried.

rsignell-usgs commented 3 years ago

Aha! Switching from s3 to local disk for the tmp zarr seems to work.

Well... the array plan worked anyway. When I execute the plan I get:

KeyError: 'startstops'
rabernat commented 3 years ago

For now...but will you get the same error when you start writing the target store?

Also, are you using distributed? Do you have a shared filesystem?

rsignell-usgs commented 3 years ago

Ah, right -- that's a problem. I'm using Dask Gateway and the dask workers don't have access to the local EBS disk. Did you ever successfully use object storage for the temp zarr file?

rabernat commented 3 years ago

Did you ever successfully use object storage for the temp zarr file?

Definitely! We are using this regularly in production with raw zarr arrays. Your error is related to groups and / or xarray datasets.

rsignell-usgs commented 3 years ago

That seems strange, since my latest test seems to be working without changes: using a LocalCluster with locally stored temp and target Zarr datasets.

I might be tempted to conclude that my Dask Gateway workers can't write to the bucket, but I know they can because if I don't use rechunk I can write the zarr.

rabernat commented 3 years ago

I might be tempted to conclude that my Dask Gateway workers can't write to the bucket

No I don't think that's the problem. It would be great if you were willing to drive into the rechunker code a bit and see if you can get to the bottom of it. The relevant part is here:

https://github.com/pangeo-data/rechunker/blob/17053cc2956ab8d417a666cc8675bcb4df1c5c56/rechunker/api.py#L512-L516

The error is getting raised because temp_store_or_group is None at this point in the code. So somehow, the temp store object is not being passed correctly to the _setup_array_rechunk function. We already have a hint that it depends on the type of store--file path vs. fsspec filesystem.

rsignell-usgs commented 3 years ago

Okay, I will try.

jbusecke commented 3 years ago

I just tried this new feature out and I am running into the same issue as @rsignell-usgs with the chunk specification. Providing tuples works, but dicts doesn't. Is anybody working on this? I could try to take a shot at it

rabernat commented 3 years ago

Are you using the latest master of rechunker that includes #64?

jbusecke commented 3 years ago

Besides the need to provide tuples, this functionality is amazing!!! This shaves hours off my preprocessing workflow for CMIP6! I just rechunked 500GB in 30 minutes on a 40core/192GB node.

rabernat commented 3 years ago

Can someone give me a fully working example of an xarray rechunk?

eric-czech commented 3 years ago

Can someone give me a fully working example of an xarray rechunk?

How's this? https://gist.github.com/eric-czech/c5628e1f895e7c93b6096ce95862ac78

rabernat commented 3 years ago

Excellent, thanks Eric! In #72 I am trying to make this work with dict syntax, i.e.

target_chunks=dict(
        salt={'ocean_time': 2, ...}
    )
eric-czech commented 3 years ago

Nice, much better that way.