OSOceanAcoustics / echopype

Enabling interoperability and scalability in ocean sonar data analysis
https://echopype.readthedocs.io/
Apache License 2.0
89 stars 70 forks source link

Add `use_swap` for scaling `compute_Sv` for EK60 #1329

Closed ctuguinay closed 1 day ago

ctuguinay commented 3 weeks ago

From #1212, Don's investigation into compute_Sv failure on a large combined Echodata object led to him determining that the large elementwise array operations taking place to compute Echo Range were the problem. He tested out the computation of Echo Range on a mock dataset of similar size to the original problem's arrays and the operation failed to compute:

ch, pt, rs = 5, 46187, 3957
sample_interval = xr.DataArray(dask.array.ones((ch, pt)), name="sample_interval", dims=["ch", "pt"], coords={"ch": np.arange(ch), "pt": np.arange(pt)})
sound_speed = xr.DataArray(dask.array.ones((ch, pt)) * 1053, name="sound_speed", dims=["ch", "pt"], coords={"ch": np.arange(ch), "pt": np.arange(pt)})
range_sample = xr.DataArray(dask.array.from_array(np.arange(rs)), name="range_sample", dims=["rs"], coords={"rs": np.arange(rs)})
res = range_sample * sample_interval * sound_speed / 2

I was able to replicate the failure: Trying to run this with 8 Dask workers with about 4 GiB of RAM led to memory leakage and then to a kernel crash.

When looking at the Dask Graph itself, one can see that the computation defaults to just 1 worker leading to this overload: image

When chunking pt dimension, we have equal distribution of processing across workers; however, we see memory build-up in each worker far far larger than what each chunk is at (each chunk is approximately at 15 MB): image

The memory build-up is due to Dask waiting for each task to finish computing before setting all values of the output array. So in our case, the final task is the division by 2, so after each chunk is done going through the entire task pipeline with the final task being that of division by 2, the divided value (and perhaps some small overhead) persists in memory until ALL chunks have gone through the same pipeline.

One way to solve this is by pointing the computation at a Zarr Store.

Creating a Dask array much larger than what Don's example had, we can do a similar elementwise operation:

image

Notice that the initial pre-Zarr-compute lazy loaded array has a task graph of 3 and the post-Zarr-compute lazy loaded array has a task graph of 1. The 3 tasks go in the order of random-sampling, multiplying, and storing the chunk within the Zarr store. The 1 task is just reading from the Zarr store.

The stored Zarr file also matches the size of both lazy-loaded arrays (with some slight compression applied to it): image

All this meaning that we can avoid computing Echo Range, and basically any other array in compute_Sv if we supply it with a Zarr store.

I suspect that this will be very similar to the use_swap option used in open_raw, so the convention will carry over and solving this issue will require that compute_Sv have its own use_swap option.

The implementation of compute_Sv with use_swap will have the following and perhaps other stuff TBD:

In addition, solving this issue will in turn solve #1212.

ctuguinay commented 3 weeks ago

Assuming lazy-loaded Echodata object groups, the computation of Echo Range is fully lazy-loaded, along with every other piece of the calibration object's compute_Sv. So I can run something like this (where ed_combined is lazy-loaded) and expect a lazy-loaded Sv DataArray in ds["Sv"]:

cal_obj = CalibrateEK60(ed_combined, **kwaargs)
ds = cal_obj._cal_power_samples("Sv")

However, ep.calibrate.compute_Sv isn't just the calling of this calibration object's _cal_power_samples function. There is a part after it where the code is adding attributes, and in that subset of code there is a specific piece that cannot be delayed:

    ds[cal_type].attrs = {
        "long_name": {
            "Sv": "Volume backscattering strength (Sv re 1 m-1)",
            "TS": "Target strength (TS re 1 m^2)",
        }[cal_type],
        "units": "dB",
        "actual_range": [
            round(float(ds[cal_type].min().values), 2),
            round(float(ds[cal_type].max().values), 2),
        ],
    }

Depending on the size of ds["Sv"], this could be a very expensive operation to do. As it currently stands, one cannot point this part of the computation towards a Zarr store since ep.calibrate.compute_Sv hasn't been fully called; computation can only be pointed towards a Zarr store if it is lazy-loaded.

The accessing of .values will force compute the min/max task graphs. I should instead change this to .data, which will return a Dask Array with the min/max task graphs.

leewujung commented 3 weeks ago

@ctuguinay : Interesting, abut yeah I agree with you. I guess we can remove the actual_range attribute... In the scenario of a large dataset, I somewhat doubt the use of this attribute.

The other thing to note here is that since we do not follow any convention at the moment for processed data (so anything beyond EchoData), we can do what is the best from the computational/storage point of view.

ctuguinay commented 3 weeks ago

Yeah, it's a bit hard to deal with this since it isn't an actual DataArray, it's part of the attributes and so even if I get the attributes fully lazy loaded, calling ds["Sv"].compute() will not compute it (also calling ds["Sv"].attrs.compute doesn't work either since attributes don't have a .compute function). image

Also, the concept of having a Dask Array with a large task graph as one of the attributes is a bit strange. When I think of attributes, it is small pieces of metadata.

leewujung commented 3 weeks ago

I see. Ok. Let's just remove actual_range for now. It also does not make sense in a data store scenario, since once one appends new data, that needs to be updated, and it becomes very convoluted...

ctuguinay commented 3 weeks ago

Sounds good 👍