OSOceanAcoustics / echodataflow

Orchestrated sonar data processing workflow
https://echodataflow.readthedocs.io/en/latest/
MIT License
4 stars 1 forks source link

Codec Buffer Limit Error When Writing Masked Output to Zarr File #103

Closed Sohambutala closed 1 month ago

Sohambutala commented 2 months ago

Description

Encountered an issue when writing the output of an applied mask to a Zarr file on disk using to_zarr. The operation fails with the following error message:

Codec does not support buffers of > 2147483647 bytes

Expected Behavior

The masked dataset should be written to the Zarr file without encountering the buffer size limit error.

Actual Behavior

Codec buffer limit

Environment (please complete the following information):

Steps to Reproduce

Here is a minimal reproducible example:

import xarray as xr
import echopype as ep
from echopype import open_raw
from echopype.calibrate import compute_Sv
from echopype.commongrid import compute_MVBS

# Load raw data
raw = open_raw(raw_file='s3://ncei-wcsd-archive/data/raw/Bell_M._Shimada/SH1707/EK60/Summer2017-D20170626-T123142.raw', storage_options={'anon': True}, sonar_model="EK60")
sv = compute_Sv(raw)

# Write to Zarr
sv.to_zarr(
    store='./temp/sv.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
    compute=True
)

# Read from Zarr
sv = xr.open_zarr('./temp/sv.zarr')

# Subset and resample
sub = sv.isel(ping_time=slice(0, None))
sam = sub['ping_time'].resample(ping_time="10s")
last_v = max([v.start for v in sam.groups.values()])
nextsub = sub.isel(ping_time=slice(0, last_v))

# Compute MVBS
mvbs = compute_MVBS(ds_Sv=nextsub, range_bin="1m", ping_time_bin="10s")

# Apply frequency differencing mask
diff = ep.mask.frequency_differencing(
    source_Sv=mvbs,
    freqABEq="120000.0Hz - 38000.0Hz > 0dB",
    storage_options={},
)

# Ensure proper dimensions
if "range_sample" not in mvbs.coords:
    mvbs = mvbs.swap_dims({"echo_range": "range_sample"}).rename_vars({"echo_range": "range_sample"})

# Apply mask
xr_d_n = ep.mask.apply_mask(
    source_ds=mvbs,
    mask=diff,
    storage_options_ds={},
    storage_options_mask={},
)

# Write masked output to Zarr
xr_d_n.to_zarr(
    store='./temp/mask.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
)

Possible Solution / Suggestion

Sohambutala commented 2 months ago

Although, when we skip the below step and use the in-memory object instead it seems to work.

# Write to Zarr
sv.to_zarr(
    store='./temp/sv.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
    compute=True
)

Working example:

def process(file):
    raw = open_raw(raw_file=file, storage_options={'anon': True}, sonar_model="EK60")
    sv = compute_Sv(raw)

    sub = sv.isel(ping_time=slice(0, None))
    sam = sub['ping_time'].resample(ping_time="10s")
    last_v = max([v.start for v in sam.groups.values()])

    nextsub = sub.isel(ping_time=slice(0, last_v))

    mvbs = compute_MVBS(ds_Sv=nextsub,
                    range_bin="1m",
                    ping_time_bin="10s")

    diff = ep.mask.frequency_differencing(
                source_Sv=mvbs,
                freqABEq="120000.0Hz - 38000.0Hz > 0dB",
                storage_options={},
            )
    if "range_sample" not in mvbs.coords:
        mvbs = mvbs.swap_dims({"echo_range": "range_sample"}).rename_vars({"echo_range": "range_sample"})

    xr_d = ep.mask.apply_mask(
                source_ds=mvbs,
                mask=diff,
                storage_options_ds={},
                storage_options_mask={},
            )
    return xr_d

xr_d = process('s3://ncei-wcsd-archive/data/raw/Bell_M._Shimada/SH1707/EK60/Summer2017-D20170626-T123142.raw')

xr_d.to_zarr(
    store='./temp/mask.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
)
leewujung commented 2 months ago

@ctuguinay : do you think this may be related to your recent fix in compute_Sv? (and I'll get to that PR asap tomorrow... likely around 12:30-1pm)

ctuguinay commented 2 months ago

@leewujung I don't think so since that PR branch is most likely what is not used here. I am getting deja vu though; I feel as though I have seen this error before...

leewujung commented 2 months ago

Oh what I meant is whether your PR changes would fix this issue.

ctuguinay commented 2 months ago

I'm not quite sure, since what I'm doing with the use_swap=True option is essentially this:

# Write to Zarr
sv.to_zarr(
    store='./temp/sv.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
    compute=True
)
leewujung commented 2 months ago

True, but before the range computation in compute_Sv was forced to expand in memory, and with your changes it does not?

ctuguinay commented 2 months ago

Ah you might be right I'll test this out

Sohambutala commented 2 months ago

I think to_zarr has compute=True by default, so I guess it does not return a delayed object but rather stores the zarr on the disk.

Adding .compute() before storing to_zarr or while loading the Sv resolves the issue. I guess issue is when we open the zarr it lazy loads and starts delaying (making task graph) until the next compute=True which is xr_d_n.to_zarr in this case.

sv = xr.open_zarr('./temp/sv.zarr').compute()

OR

# Write masked output to Zarr
xr_d_n.compute().to_zarr(
    store='./temp/mask.zarr',
    mode="w",
    consolidated=True,
    storage_options={},
)
ctuguinay commented 2 months ago

@Sohambutala you should also do something about the echo_range in the diff data variable:

# Ensure proper dimensions
if "range_sample" not in mvbs.coords:
    mvbs = mvbs.swap_dims({"echo_range": "range_sample"}).rename_vars({"echo_range": "range_sample"})
    diff = diff.swap_dims({"echo_range": "range_sample"})

Without it, xr_d_n will have 4 dimensions: echo_range, range_sample, ping_time, channel. When I have time, I'll make a PR to fix this. Sorry that it's like this right now