fsspec / kerchunk

Cloud-friendly access to archival data
https://fsspec.github.io/kerchunk/
MIT License
310 stars 80 forks source link

translate error "ValueError: Shuffle buffer is not an integer multiple of elementsize" #342

Closed kthyng closed 1 year ago

kthyng commented 1 year ago

Hi! I am going through the quick start to try out kerchunk (finally!). Unfortunately I hit what is to me a weird error and I am not sure what direction to go, being new to the library and the error.

I'm following the steps from the Quick Start: https://fsspec.github.io/kerchunk/test_example.html#

I am using local netcdf files, not from S3. Perhaps this is the issue because all the examples I can find are for files on S3 instead of local files.

Here is my code, though it isn't reproducible since the files are local

import kerchunk.hdf
import fsspec
import pathlib
from kerchunk.combine import MultiZarrToZarr

base = pathlib.Path("/mnt/depot/data/packrat/prod/aoos/nwgoa/processed/")
urls = sorted(base.glob("1999/*1999-01*.nc"))

so = dict(
    # anon=True, 
    default_fill_cache=False, default_cache_type='first'
)
singles = []
for u in urls:
    with fsspec.open(u, **so) as inf:
        h5chunks = kerchunk.hdf.SingleHdf5ToZarr(inf, u, inline_threshold=100)
        singles.append(h5chunks.translate())

# Multi-file JSONs
mzz = MultiZarrToZarr(
    singles,
    # remote_protocol="s3",
    remote_options={'anon': True},
    concat_dims=["ocean_time"],
)

out = mzz.translate()

The error is ValueError: Shuffle buffer is not an integer multiple of elementsize and here is the stack trace

--------------------------------------------------------------------------- ValueError Traceback (most recent call last) Cell In[30], line 9 1 mzz = MultiZarrToZarr( 2 singles, 3 remote_protocol='file', (...) 6 concat_dims=["ocean_time"], 7 ) ----> 9 out = mzz.translate() File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/kerchunk/combine.py:496, in MultiZarrToZarr.translate(self, filename, storage_options) 490 """Perform all stages and return the resultant references dict 491 492 If filename and storage options are given, the output is written to this 493 file using ujson and fsspec instead of being returned. 494 """ 495 if 1 not in self.done: --> 496 self.first_pass() 497 if 2 not in self.done: 498 self.store_coords() File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/kerchunk/combine.py:258, in MultiZarrToZarr.first_pass(self) 256 z = zarr.open_group(fs.get_mapper("")) 257 for var in self.concat_dims: --> 258 value = self._get_value(i, z, var, fn=self._paths[i]) 259 if isinstance(value, np.ndarray): 260 value = value.ravel() File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/kerchunk/combine.py:226, in MultiZarrToZarr._get_value(self, index, z, var, fn) 224 o = z[var].attrs[item] 225 elif selector.startswith("data:"): --> 226 o = z[selector.split(":", 1)[1]][...] 227 elif selector.startswith("cf:"): 228 import cftime File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/zarr/core.py:844, in Array.__getitem__(self, selection) 842 result = self.get_orthogonal_selection(pure_selection, fields=fields) 843 else: --> 844 result = self.get_basic_selection(pure_selection, fields=fields) 845 return result File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/zarr/core.py:970, in Array.get_basic_selection(self, selection, out, fields) 968 return self._get_basic_selection_zd(selection=selection, out=out, fields=fields) 969 else: --> 970 return self._get_basic_selection_nd(selection=selection, out=out, fields=fields) File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/zarr/core.py:1012, in Array._get_basic_selection_nd(self, selection, out, fields) 1006 def _get_basic_selection_nd(self, selection, out=None, fields=None): 1007 # implementation of basic selection for array with at least one dimension 1008 1009 # setup indexer 1010 indexer = BasicIndexer(selection, self) -> 1012 return self._get_selection(indexer=indexer, out=out, fields=fields) File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/zarr/core.py:1388, in Array._get_selection(self, indexer, out, fields) 1385 if math.prod(out_shape) > 0: 1386 # allow storage to get multiple items at once 1387 lchunk_coords, lchunk_selection, lout_selection = zip(*indexer) -> 1388 self._chunk_getitems( 1389 lchunk_coords, 1390 lchunk_selection, 1391 out, 1392 lout_selection, 1393 drop_axes=indexer.drop_axes, 1394 fields=fields, 1395 ) 1396 if out.shape: 1397 return out File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/zarr/core.py:2228, in Array._chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, drop_axes, fields) 2226 for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection): 2227 if ckey in cdatas: -> 2228 self._process_chunk( 2229 out, 2230 cdatas[ckey], 2231 chunk_select, 2232 drop_axes, 2233 out_is_ndarray, 2234 fields, 2235 out_select, 2236 partial_read_decode=partial_read_decode, 2237 ) 2238 else: 2239 # check exception type 2240 if self._fill_value is not None: File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/zarr/core.py:2141, in Array._process_chunk(self, out, cdata, chunk_selection, drop_axes, out_is_ndarray, fields, out_selection, partial_read_decode) 2139 except ArrayIndexError: 2140 cdata = cdata.read_full() -> 2141 chunk = self._decode_chunk(cdata) 2143 # select data from chunk 2144 if fields: File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/zarr/core.py:2409, in Array._decode_chunk(self, cdata, start, nitems, expected_shape) 2407 if self._filters: 2408 for f in reversed(self._filters): -> 2409 chunk = f.decode(chunk) 2411 # view as numpy array with correct dtype 2412 chunk = ensure_ndarray_like(chunk) File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/numcodecs/shuffle.py:50, in Shuffle.decode(self, buf, out) 49 def decode(self, buf, out=None): ---> 50 buf, out = self._prepare_arrays(buf, out) 52 if self.elementsize <= 1: 53 return out # no shuffling needed File ~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/numcodecs/shuffle.py:35, in Shuffle._prepare_arrays(self, buf, out) 32 return buf, out 34 if buf.nbytes % self.elementsize != 0: ---> 35 raise ValueError("Shuffle buffer is not an integer multiple of elementsize") 37 return buf, out ValueError: Shuffle buffer is not an integer multiple of elementsize

Thanks for any ideas!

dcherian commented 1 year ago

This may help: https://ncar.github.io/esds/posts/2023/kerchunk-mom6/

kthyng commented 1 year ago

Should I be using the NetCDF3ToZarr backend for netcdf files, instead of the hdf backend?

martindurant commented 1 year ago

Should I be using the NetCDF3ToZarr backend for netcdf files, instead of the hdf backend?

You need to use the appropriate one! The first few bytes will be b"CDF" for netCDF3 and b"HDF" for netCDF4/hdf backend if memory serves.

kthyng commented 1 year ago

I think that was a red herring — they are netcdf4 files.

I just tried local files from another model and they immediately work, as in the steps I posted in run without error. So, I guess the problem is a quirk of the model files I used.

Still, I would ultimately like to use kerchunk on the original model files so if anyone has any ideas where to go in terms of debugging for that error, I would appreciate hearing them.

martindurant commented 1 year ago

The error from shuffle doesn't mean much to me. It may be the case that numcodecs' shuffle implementation (numcodecs._shuffle._foUnshuffle) is more limited than hdf's, but I don't know why that might be. It is a pretty short function.

kthyng commented 1 year ago

Grid information is not included in the model output files that aren't working, including lon/lat coordinates and many other grid variables. Could that be relevant? I mention it because it I am working with both of these models side by side and I frequently have to account for that as you can imagine. Not sure what else is obviously different between the two.

martindurant commented 1 year ago

Are the chunk sizes consistent across the files, then?

kthyng commented 1 year ago

Sorry I missed this yesterday.

Chunks aren't specified in the individual files, if that is what you mean. I retried this with just 2 model files to narrow the scope and got the same error.

martindurant commented 1 year ago

Chunks aren't specified in the individual files

Sorry, I don't follow. Can you show the data structure of the two files separately? Could it be that one has shuffle but the other no?

kthyng commented 1 year ago

Ok hopefully this will help:

One file:

<xarray.Dataset>
Dimensions:          (tracer: 2, s_rho: 50, s_w: 51, boundary: 4,
                      ocean_time: 24, eta_rho: 362, xi_rho: 794)
Coordinates:
  * ocean_time       (ocean_time) datetime64[ns] 1999-01-31T23:00:00 ... 1999...
  * s_rho            (s_rho) float64 -0.99 -0.97 -0.95 ... -0.05 -0.03 -0.01
  * s_w              (s_w) float64 -1.0 -0.98 -0.96 -0.94 ... -0.04 -0.02 0.0
Dimensions without coordinates: tracer, boundary, eta_rho, xi_rho
Data variables: (12/89)
    Akk_bak          float64 ...
    Akp_bak          float64 ...
    Akt_bak          (tracer) float64 ...
    Akv_bak          float64 ...
    Charnok_alpha    float64 ...
    CrgBan_cw        float64 ...
    ...               ...
    uice_eastward    (ocean_time, eta_rho, xi_rho) float32 ...
    v_northward      (ocean_time, s_rho, eta_rho, xi_rho) float32 ...
    vice_northward   (ocean_time, eta_rho, xi_rho) float32 ...
    w                (ocean_time, s_w, eta_rho, xi_rho) float32 ...
    xl               float64 ...
    zeta             (ocean_time, eta_rho, xi_rho) float32 ...
Attributes: (12/66)
    file:                      nwgoa_his_00744.nc
    format:                    netCDF-3 64bit offset file
    Conventions:               CF-1.4
    type:                      ROMS/TOMS history file

Second file:

<xarray.Dataset>
Dimensions:          (tracer: 2, s_rho: 50, s_w: 51, boundary: 4,
                      ocean_time: 24, eta_rho: 362, xi_rho: 794)
Coordinates:
  * ocean_time       (ocean_time) datetime64[ns] 1999-02-01T23:00:00 ... 1999...
  * s_rho            (s_rho) float64 -0.99 -0.97 -0.95 ... -0.05 -0.03 -0.01
  * s_w              (s_w) float64 -1.0 -0.98 -0.96 -0.94 ... -0.04 -0.02 0.0
Dimensions without coordinates: tracer, boundary, eta_rho, xi_rho
Data variables: (12/89)
    Akk_bak          float64 ...
    Akp_bak          float64 ...
    Akt_bak          (tracer) float64 ...
    Akv_bak          float64 ...
    Charnok_alpha    float64 ...
    CrgBan_cw        float64 ...
    ...               ...
    uice_eastward    (ocean_time, eta_rho, xi_rho) float32 ...
    v_northward      (ocean_time, s_rho, eta_rho, xi_rho) float32 ...
    vice_northward   (ocean_time, eta_rho, xi_rho) float32 ...
    w                (ocean_time, s_w, eta_rho, xi_rho) float32 ...
    xl               float64 ...
    zeta             (ocean_time, eta_rho, xi_rho) float32 ...
Attributes: (12/66)
    file:                      nwgoa_his_00768.nc
    format:                    netCDF-3 64bit offset file
    Conventions:               CF-1.4
    type:                      ROMS/TOMS history file
dcherian commented 1 year ago

I think you want ncdump -sh . It'll tell you about _ChunkSizes and shuffle things too i think

kthyng commented 1 year ago

Oh ok, here those are:

ncdump1.txt ncdump2.txt

martindurant commented 1 year ago

It seems like actually the individual files don't load (i.e., you can't access their data - but maybe only one of the variables), before combination?

The two files look identical to each other, so perhaps there is a difference between these and another that works OK.

Perhaps next I would make run pdb following the exception and go up as far as "~/miniconda3/envs/xp-inventory/lib/python3.11/site-packages/kerchunk/combine.py:226" in the stack to see the state of the particular zarr data that is being accessed.

kthyng commented 1 year ago

Ok good point. I dove into the code there and get:

z[selector.split(":", 1)[1]] returns <zarr.core.Array '/ocean_time' (24,) float64>

but

z[selector.split(":", 1)[1]][...] returns the error *** ValueError: Shuffle buffer is not an integer multiple of elementsize

I am not familiar with [...] and can't seem to google it. What does that do at the end of that line?

kthyng commented 1 year ago

Ah I see as I am working in my other issue #343 that the [...] pulls out the numbers, maybe like [:] when working with netcdf datasets.

Maybe I need to go farther back in the code to see why those dates/values can't be accessed then.

kthyng commented 1 year ago

I had a pretty random guess to try to figure out why one model was working and the other wasn't and compare the ocean_time variable attributes. I changed the calendar using ncap2 from nco from "gregorian" to "proleptic_gregorian" in the model that wasn't working and now it works.

ncap2 -O -s 'ocean_time@calendar="proleptic_gregorian"' nwgoa_1999-02-01.nc nwgoa_out.nc

The issue ended up being from reading in with zarr from what I can tell.

kthyng commented 1 year ago

What I said in the last comment is wrong.

It appears that doing anything with ncap2 to the file fixes this issue. Using the original file, locally, leads to the shuffle error, but using that same file run through ncap2 with some arbitrary (as far as I can tell) change to an attribute (including on a different variable than ocean_time) change does not have the shuffle error.

I don't see any difference in the ncdump -hs between an unaltered and altered file ocean_time variable (when I don't modify ocean_time), but again the modified file moves past the error brought up on ocean_time.

Does ncap2 perform some basic change to the file that could account for this?

martindurant commented 1 year ago

Does ncap2 perform some basic change

I have absolutely no idea what ncap2 might be doing. If you look through the references kerchunk generated for the two versions of the file, perhaps you can see a difference there for the appropriate variable. You can just look at the references dict directly (even with test-search in any editor), or use a combination of .ls and _cat_common on the filesystem to find the details of given references.

the [...] pulls out the numbers, maybe like [:]

... (builtins.Ellipsis) means "the whole of any number of axes", but : (builtins.slice(None)) means "the whole of one axis", so for a 1d array they are equivalent, but for 2d [...]->[:, :]. This is a numpy convention: https://numpy.org/doc/stable/user/basics.indexing.html#dimensional-indexing-tools

kthyng commented 1 year ago

Just a few updates:

I am going to rewrite the files with ncks and use those, so I will close this issue.

kthyng commented 1 year ago

Ok I have an actual answer now, though the solution is the same as I said before, or you can avoid the problem in the first place.

This information and steps below come from @ajelenak who helpfully corresponded by email to figure it out though a chance connection through @hyoklee.

The error message "Shuffle buffer is not an integer multiple of elementsize" very likely means that the Shuffle codec used by the kerchunk package expects that the (chunk) buffer's size to process is exactly the product of chunk’s number of array elements and the byte size of the element’s datatype.

The error was caught on the ocean_time variable when I dug into the kerchunk code so I ran h5dump -d ocean_time -p [netcdf_file_causing_error.nc] and it showed

   FILTERS {
      PREPROCESSING SHUFFLE
      COMPRESSION DEFLATE { LEVEL 8 }
      COMPRESSION DEFLATE { LEVEL 3 }
   }

We also looked at the kerchunk output for "ocean_time/.zarray" which showed

'{"chunks":[512],"compressor":{"id":"zlib","level":3},"dtype":"<f8","fill_value":null,"filters":[{"elementsize":8,"id":"shuffle"}],"order":"C","shape":[24],"zarr_format":2}'

The chunks of this netCDF variable are twice compressed with the same method (DEFLATE, also called zlib) but different compression levels (first, 8, second, 3). This is allowed by the HDF5 library but not possible for Zarr data: There can be only one compression method and applied only once. This is confirmed in the .zarray metadata where the shuffle and the second DEFLATE (zlib) compression are recorded, but not the zlib level 8. When the chunk buffer reaches the Kerchunk’s Shuffle codec, it is still not decompressed to its original size and, hence, the ValueError you are seeing.

Regenerating that file with one of the NCO tools probably applies only one zlib compression and thus makes those chunks acceptable for the Zarr software. You can check this by running the h5dump command for the same netCDF variable on the ncks-generated file. -> I did check this and confirmed it. The h5dump for the file that came out of ncks showed:

   FILTERS {
      PREPROCESSING SHUFFLE
      COMPRESSION DEFLATE { LEVEL 3 }
   }

Thanks very much to @ajelenak for untangling a really mysterious error!

martindurant commented 1 year ago

Ah, well indeed we hadn't thought to support multiple invocations of the compressor. In fact, I strongly doubt that running deflate twice gives any benefit! However zarr DOES allow multiple compressors. There is some confusion in the spec with a false distinction between "filters" and "compressors", but these are both just numcodecs implementations and you can have multiple of the former. For example, the following is fine:

store = {}
import numcodecs
comp1 = numcodecs.Zlib(level=5)
comp2 = numcodecs.Zlib(level=7)
z = zarr.open_array(store, mode="w", dtype="i4", shape=(10, 10), chunks=(5, 5), filters=[comp1, comp2], compression=None)
print(store['.zarray'].decode())
{
    "chunks": [
        5,
        5
    ],
    "compressor": null,
    "dimension_separator": ".",
    "dtype": "<i4",
    "fill_value": 0,
    "filters": [
        {
            "id": "zlib",
            "level": 5
        },
        {
            "id": "zlib",
            "level": 7
        }
    ],
    "order": "C",
    "shape": [
        10,
        10
    ],
    "zarr_format": 2
}

...or it could have been one compressor and one filter. The "compressor" field is only special in that it is the last filter and required to produce a bytestream rather than an array.

Would you like to hand-edit your kerchunk output for the broken file to add a "filter" to the .zarray and see if it works?

martindurant commented 1 year ago

Can you find the h5obj.compression values given by h5py for one of the affected variables? You could put a breakpoint in SingleHdf5ToZarr around line 200 if you don't want to do it by hand - or just send me one file.

kthyng commented 1 year ago

Would you like to hand-edit your kerchunk output for the broken file to add a "filter" to the .zarray and see if it works?

I changed singles[0]["refs"]["ocean_time/.zarray"] from

'{"chunks":[512],"compressor":{"id":"zlib","level":3},"dtype":"<f8","fill_value":null,"filters":[{"elementsize":8,"id":"shuffle"}],"order":"C","shape":[24],"zarr_format":2}'

to add a filter like in your example:

singles[0]["refs"]["ocean_time/.zarray"] = '{"chunks":[512],"compressor":{"id":"zlib","level":3},"dtype":"<f8","fill_value":null,"filters":[{"elemetsize":8,"id":"shuffle"},{"id": "zlib", "level":3}],"order":"C","shape":[24],"zarr_format":2}'

and then my MultiZarr command worked! However, I did get the same error when I tried to open the dataset using the result of that MultiZarr (see below for basic code) so do I need to change it more than one place maybe?

mzz = MultiZarrToZarr(
    singles,
    concat_dims=["ocean_time"],
 )
out = mzz.translate()

import xarray as xr
ds = xr.open_dataset(
    "reference://", engine="zarr",
     backend_kwargs={
          "storage_options": {
              "fo": out,
          },
          "consolidated": False
    }
)
kthyng commented 1 year ago

Can you find the h5obj.compression values given by h5py for one of the affected variables? You could put a breakpoint in SingleHdf5ToZarr around line 200 if you don't want to do it by hand - or just send me one file.

Is this a different approach to the first question or should I also do this?

martindurant commented 1 year ago

Is this a different approach

Yes, this would be the route to fixing it in kerchunk so that no manual rewriting of the JSON will be necessary in the future.

martindurant commented 1 year ago

However, I did get the same error when I tried to open the dataset using the result of that MultiZarr (see below for basic code) so do I need to change it more than one place maybe?

It its the same across all inputs, I would have expected it to work without anything else, but since I've never seen this before, it's always possible there's another spot that needs changing.

kthyng commented 1 year ago

h5obj.compression gives gzip for the variable ocean_time in my affected file. Is that the sort of thing you're looking for?

kthyng commented 1 year ago

it's always possible there's another spot that needs changing

That gave me the idea to use a preprocess function to change this for all variables with ".zarray" in them and that did the trick. Here is the preprocess function I used:

def preprocess(refs):
    for k in list(refs):
        if k.endswith('/.zarray'):
            refs[k] = refs[k].replace('"filters":[{"elementsize":8,"id":"shuffle"}]',
                                      '"filters":[{"elementsize":8,"id":"shuffle"},{"id": "zlib", "level":3}]')
    return refs

EDIT: In further investigation I realized I had not fully addressed this issue. In fact there were two types of shuffles and I needed to address both of them. Also since the level 3 compression was in the compressor, this compression filter needs to be level 8.

def preprocess(refs):
    for k in list(refs):
        if k.endswith('/.zarray'):
            refs[k] = refs[k].replace('"filters":[{"elementsize":8,"id":"shuffle"}]',
                                      '"filters":[{"elementsize":8,"id":"shuffle"},{"id": "zlib", "level":8}]')
            refs[k] = refs[k].replace('"filters":[{"elementsize":4,"id":"shuffle"}]',
                                      '"filters":[{"elementsize":4,"id":"shuffle"},{"id": "zlib", "level":8}]')
    return refs
ajelenak commented 1 year ago

However zarr DOES allow multiple compressors. There is some confusion in the spec with a false distinction between "filters" and "compressors"

I did not know this, very useful! Perhaps the Zarr spec should state that specifying only filters, including compression filters, is enough?

Fixing this in kerchunk will require using more low-level h5py API. That h5py.Dataset.compression property will not do it for cases like this one.

martindurant commented 1 year ago

I did not know this, very useful! Perhaps the Zarr spec should state that specifying only filters, including compression filters, is enough?

Perhaps, but I assume that the language around this is more exact in V3, where all of the current focus is.

Fixing this in kerchunk will require using more low-level h5py API.

Is this difficult?

ajelenak commented 1 year ago

Fixing this in kerchunk will require using more low-level h5py API.

Is this difficult?

I am not the right person to answer this question. 😄

What needs to be done is very similar to this h5py code: https://github.com/h5py/h5py/blob/6b5af4c6495bf865fee5f036122187c21fcb17d4/h5py/_hl/filters.py#L294-L331.

  1. Loop over all declared filters and translate to the numcodecs equivalents.
  2. Recognize which ones are compression filters.
  3. Declare the last compression filter as the compressor in the Zarr array's metadata and put all the other filters in the Zarr array's metadata as filters list.

The above may be simpler for Zarr v3 where, I think, the same approach as in HDF5 was taken that all filters are treated equally but are called "codecs".

martindurant commented 1 year ago

Recognize which ones are compression filters.

Is it not the case that all HDF filters are bytes->bytes?

ajelenak commented 1 year ago

Yes. I was thinking of the need to decide which of the compressors (if more than one) should be declared in the Zarr array metadata as such.

martindurant commented 1 year ago

Yes. I was thinking of the need to decide which of the compressors (if more than one) should be declared in the Zarr array metadata as such.

For v2, I believe it's fine to have multiple filters and NO compressor. When decoding, codecs are called last-first. IF there is a compressor, it simply becomes the last filter, and gets called first.