pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

Not able to rechunk a full group, only arrays. #18

Closed jbusecke closed 4 years ago

jbusecke commented 4 years ago

I just tried rechunker for the first time. Thanks a lot for putting this together, I am very hopeful this will end my long nights trying to diagnose blown up dask workers šŸ˜€

My use case

I want to use this package to preprocess a large amount of local netcdf files to a unified chunking scheme. Previously I have encountered many different problems (like other users) with memory overflow, and quirks with the xarray.to_zarr() chunking. Circumventing all that and preprocessing on a lower level seems like a great solution to these problems.

I pulled the latest master and started some preliminary tests. My overall workflow looks something like this:

Identify netdcf files that belong toghether, load them with xr.open_mfdataset, rechunk (usually into single time slices). My original dataset used to create a test store looks like this: image

Then save to a first zarr store (this usually works). ds.to_zarr('first_write.zarr', mode='w')

I reload the zarr store:

import zarr
g = zarr.group('first_write.zarr/')

Then I would like to use rechunk to rewrite this store with a predefined chunk structure.

rechunked = rechunk(g, {'i': 180, 'j': 90, 'time': 60, 'bnds': 2, 'lev': 40, 'vertices': 4}, 1e9, 'first_write_rechunked.zarr', temp_store='temp_store.zarr')

This however fails with the following error:

TypeError                                 Traceback (most recent call last)
<ipython-input-35-812ed47dd139> in <module>
      9 
     10 
---> 11 rechunked2 = rechunk(g, chunks , 500e6, f'first_write_rechunked.zarr', temp_store='temp_store.zarr')

/scratch/gpfs2/jbusecke/conda_tigressdata/envs/cmip_data_management_princeton/lib/python3.8/site-packages/rechunker/api.py in rechunk(source, target_chunks, max_mem, target_store, temp_store)
    199 
    200         for array_name, array_target_chunks in target_chunks.items():
--> 201             delayed = _rechunk_array(
    202                 source[array_name],
    203                 array_target_chunks,

/scratch/gpfs2/jbusecke/conda_tigressdata/envs/cmip_data_management_princeton/lib/python3.8/site-packages/rechunker/api.py in _rechunk_array(source_array, target_chunks, max_mem, target_store_or_group, temp_store_or_group, name, source_storage_options, temp_storage_options, target_storage_options)
    278             )
    279 
--> 280     read_chunks, int_chunks, write_chunks = rechunking_plan(
    281         shape, source_chunks, target_chunks, itemsize, max_mem
    282     )

/scratch/gpfs2/jbusecke/conda_tigressdata/envs/cmip_data_management_princeton/lib/python3.8/site-packages/rechunker/algorithm.py in rechunking_plan(shape, source_chunks, target_chunks, itemsize, max_mem, consolidate_reads, consolidate_writes)
    115     if len(source_chunks) != ndim:
    116         raise ValueError(f"source_chunks {source_chunks} must have length {ndim}")
--> 117     if len(target_chunks) != ndim:
    118         raise ValueError(f"target_chunks {target_chunks} must have length {ndim}")
    119 

TypeError: object of type 'int' has no len()

The rechunking is working when I apply it to a single array like this: rechunk(g.po4, chunks , 500e6, f'first_write_rechunked.zarr', temp_store='temp_store.zarr')

Am I misunderstanding how to specify the chunks?

jbusecke commented 4 years ago

Ok I think I found a solution. It was not immediately clear how to format the chunks argument for a group. I have now written a little wrapper that loads the source store temporarily and gets the dimension size and parses the chunks accordingly.

import zarr
def rechunker_wrapper(source_store, chunks= {'x':180, 'y':90, 'time':60}, target_store, temp_store):
    g = zarr.group(source_store)
    # get the correct shape from loading the store as xr.dataset and parse the chunks
    ds_chunk = xr.open_zarr(source_store)
    group_chunks = {}
    for var in ds_chunk.variables:
        # pick appropriate chunks from above, and default to full length chunks for dimensions that are not in `chunks` above.
        group_chunks[var] = tuple([chunks[di] if di in chunks.keys() else len(ds_chunk[di]) for di in ds_chunk[var].dims])
    rechunked = rechunk(g, group_chunks , 500e6, target_store, temp_store=temp_store)
    rechunked.compute()
    zarr.convenience.consolidate_metadata(target_store)

This works pretty good for me now. Thanks again for this cool software!

EDIT: HOLY šŸ®, this just rechunked a 300GB control run in 1 minute!

rabernat commented 4 years ago

@jbusecke - groups are supported. I just need to finish the documentation to explain the syntax. I'm literally working on it now šŸ˜† .

rabernat commented 4 years ago

Hi Julius--I just got some better docs up: https://rechunker.readthedocs.io/en/latest/tutorial.html

Let me know if you are able to figure out the group syntax.

jbusecke commented 4 years ago

Already did (good tests FTW!), but I'll take a look anyways!

Looks great, and I can eliminate some of the logic in my example above by passing a nested dict!

I'll close this for now.