OSOceanAcoustics / echopype

Enabling interoperability and scalability in ocean sonar data analysis
https://echopype.readthedocs.io/
Apache License 2.0
89 stars 70 forks source link

Intermittent Blosc Decompression Error During Zarr Operations #1338

Closed Sohambutala closed 2 weeks ago

Sohambutala commented 2 weeks ago

Description

Encountering an intermittent issue with Blosc decompression. The specific error message is:

File numcodecs\\blosc.pyx:564, in numcodecs.blosc.Blosc.decode()
File numcodecs\\blosc.pyx:394, in numcodecs.blosc.decompress()
RuntimeError: error during blosc decompression: 0

This error occurs when slicing parts of two Zarr files and then storing the concatenated result to disk using to_zarr.

Steps to Reproduce:

from collections import defaultdict
from echopype import open_raw
from echopype.calibrate import compute_Sv
from echopype.commongrid import compute_MVBS
import xarray as xr
import echopype as ep

def process(file) -> xr.Dataset:
    raw = open_raw(raw_file=file, storage_options={'anon': True}, sonar_model="EK60")
    sv = compute_Sv(raw)
    return sv

# Range from 0 - 526
sv = process('s3://ncei-wcsd-archive/data/raw/Bell_M._Shimada/SH1707/EK60/Summer2017-D20170626-T123142.raw')
sv.to_zarr(
    store="./store/store.zarr",
    mode="w",
    consolidated=True,
    storage_options={},
    append_dim=None,
    safe_chunks=False
)

# Range from 526 - 1048
sv = process('s3://ncei-wcsd-archive/data/raw/Bell_M._Shimada/SH1707/EK60/Summer2017-D20170626-T130105.raw')
sv.to_zarr(
    store="./store/store.zarr",
    mode="a",
    consolidated=True,
    storage_options={},
    append_dim='ping_time',
    safe_chunks=False
)

store: xr.Dataset = xr.open_zarr('./store/store.zarr/')

slice = store.isel(ping_time=slice(526, 1045))
slice.to_zarr(
    store='./stores/slice.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
    safe_chunks=False
)
Sohambutala commented 2 weeks ago

It seems like the Blosc issue also occurs even when we append the Sv data to the Sv store. The above example is small and might not run into this problem when storing sv.

Some related discussions found:

https://discourse.pangeo.io/t/problem-with-blosc-decompression-and-zarr-files/2648

intermittent errors during blosc decompression of zarr chunks on pangeo.pydata.org

Sohambutala commented 2 weeks ago

Also, found this discussion in echopype. @leewujung I know its long back but do you remember if there was a fix applied in echopype for this issue?

PR : Address memory issues in combine

At the request of @leewujung, here is a list of TODOs for this PR (some may not be included, but I will try to update this list as I remember them):

TODOs

  • [ ] Figure out RuntimeError: error during blosc decompression: -1

  • [x] run a selection of files from noaa-wcsd-pds/data/raw/Bell_M._Shimada/SH1707/EK80
  • [x] run a selection of files from noaa-wcsd-pds/data/raw/Bell_M._Shimada/SH2106/EK80

    • [x] These runs triggered an error in the current PR code because the channel dimension is not always in ascending order. This is currently being corrected in issue Unordered channel coordinate  #815.
  • [x] run a selection of files from noaa-wcsd-pds/data/raw/Bell_M._Shimada/SH1701/EK60

  • [x] run a selection of files from OOI associated with the AZFP echosounder
  • [x] add attributes to each EchoData group
  • [x] create a check that all of the channels in each dataset are the same i.e. all channels must have the same names and length.
  • [x] check for and correct reversed time
  • [x] store old times if they needed to be corrected
  • [x] Perform a minimal check that the first time value of each Dataset is less than the first time value of the subsequent Dataset
  • [x] change lazy naming for class
  • [x] Make sure the combine function works with different sized range_sample dimensions amongst the sensors
  • [x] change filenames (in Provenance) numbering to range(len(filenames))
  • [x] Add default 'compressor' if it does not exist in the encoding of a variable.
  • [x] Incorporate this new combine method into the echopype API

    • [x] Discuss the possibility of having a depreciation cycle for the old combine method. In our in person meeting, we have decided that we will just use the new combine method created in this PR going forward and we will remove the old combine method entirely.

    • [x] For the new API, we just need to add the parameter path (or some variant of it) to ep.combine. We will ensure that this path is appropriate using validate_output_path. Additionally, if no path is provided we will do a warning and mention that it is being written to a temporary folder (specified by validate_output_path)

    • The above item will make sure it works for s3 bucket writing

    • We may need to allow for fs_map input (make this a new issue/PR maybe)

  • [ ] Create integration tests for new combine function

A couple of additional items to note:

  • The Provenance attribute variables are named differently in this PR
  • We introduce a routine that ensures all ds_lists have identical attributes. Runtime errors are raised if:

    • The keys are not the same
    • The values are not identical
    • The keys date_created, conversion_time do not have the same types
  • If attribute values are numpy arrays, then they will not be included in the Provenance group. Instead, these values will only appear in the attributes of the combined EchoData object.
Sohambutala commented 2 weeks ago

So far, I have tried the following from echodataflow end but still facing the issue intermittently:

  1. Setting zarr.storage.default_compressor to Zlib
  2. Setting blosc.set_nthreads(1)
  3. Running on a dask cluster, since DaskChunkingManager is being used by default even when not executing on dask cluster.
ctuguinay commented 2 weeks ago

I think this Blosc Decompression issue is more of a chunking error than a problem with Blosc Decompression. There were two ways I was able to get this to run: Rechunking and Computing before to_zarr

slice.chunk("auto").to_zarr(
    store='./stores/slice.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
    safe_chunks=False
)
slice.compute().to_zarr(
    store='./stores/slice.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
    safe_chunks=False
)

The former resets the chunks to full dimension width (i.e creates 1 big chunk) and the latter removes chunks altogether. Although, I'm not quite sure the mechanism(s) behind why the original slice chunks were 'bad'.

I think the Blosc Decompression error is just a sign that something is going wrong with the chunking rather than the problem itself.

Sohambutala commented 2 weeks ago

Yes, I forgot to mention that I’m now using the following approach to store the slice and pass it to the MVBS function instead of keeping the slice in memory:

nextsub.compute().to_zarr(
    store=out_zarr,
    mode="w",
    consolidated=True,
    storage_options=config.output.storage_options_dict,
    safe_chunks=False,                    
)

However, the Blosc issue also occurs when writing Sv results to the store. I’m attaching the entire console logs. The slice operation happens after the write_output stage.

I believe using .compute() works because it forces the computation of the Dask array into memory, resolving any pending operations and converting it before writing it to disk. This helps avoid issues with chunking and parallel writes that likely lead to Blosc decompression errors. These errors might be occurring due to conflicts when multiple processes try to read from or write to the same chunk simultaneously, causing decompression failures.

Blosc.log

Using the same approach for write_output scenario might not be feasible though, since loading the entire store into memory every time we append a slice will dramatically affect performance.

ctuguinay commented 2 weeks ago

Fixed with using zarr.sync.ThreadSynchronizer from discussion today with @Sohambutala @leewujung @valentina-s

This issue could potentially be reopened if the Blosc error pops up again.