OSOceanAcoustics / echopype

Enabling interoperability and scalability in ocean sonar data analysis
https://echopype.readthedocs.io/
Apache License 2.0
91 stars 71 forks source link

Failure to perform `compute_Sv` with dask inputs #1212

Closed lsetiawan closed 1 day ago

lsetiawan commented 8 months ago

Overview

Currently, compute_Sv does not work with a merged Sv file (raw_converted_combined/2013/x0097_2_wt_20130824_232034_f0031.zarr) when opened as dask arrays. Worker memory get's overwhelmed and even spilling to disk by dask won't cut it. This is the non descriptive dask traceback:

2023-11-06 17:00:16,536 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 11.37 GiB -- Worker memory limit: 16.00 GiB
2023-11-06 17:00:16,878 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 12.85 GiB -- Worker memory limit: 16.00 GiB
2023-11-06 17:00:16,922 - distributed.worker.memory - WARNING - Worker is at 79% memory usage. Resuming worker. Process memory: 12.70 GiB -- Worker memory limit: 16.00 GiB
...
RuntimeError: Cannot remove replica of ('transpose-4cd487e9eb8037d494b448b470058650', 0, 0, 0) while ('rechunk-split-rechunk-merge-78880722c4e511b3105b467cdecc2000', 0, 22, 2) in state 'executing'.
RuntimeError: Cannot remove replica of ('transpose-4cd487e9eb8037d494b448b470058650', 0, 0, 0) while ('rechunk-split-rechunk-merge-78880722c4e511b3105b467cdecc2000', 0, 22, 2) in state 'executing'.

The above traceback is not very useful, so I tried to perform compute_Sv without using dask and that seemed to work and I was able to profile the memory usage. It seems like there are a few spots that huge spikes in memory occurs.

Memory Line by Line Profile
``` Filename: /Users/lsetiawan/Repos/SSEC/echopype/echopype/calibrate/calibrate_ek.py Line # Mem usage Increment Occurrences Line Contents ============================================================= 137 222.6 MiB 222.6 MiB 1 def __init__(self, echodata: EchoData, env_params, cal_params, ecs_file, **kwargs): 138 222.6 MiB 0.0 MiB 1 super().__init__(echodata, env_params, cal_params, ecs_file) 139 140 # Set sonar_type and waveform/encode mode 141 222.6 MiB 0.0 MiB 1 self.sonar_type = "EK60" 142 143 # Set cal type 144 222.6 MiB 0.0 MiB 1 self.waveform_mode = "CW" 145 222.6 MiB 0.0 MiB 1 self.encode_mode = "power" 146 147 # Get the right ed_beam_group for CW power samples 148 222.6 MiB 0.0 MiB 2 self.ed_beam_group = retrieve_correct_beam_group( 149 222.6 MiB 0.0 MiB 1 echodata=self.echodata, waveform_mode=self.waveform_mode, encode_mode=self.encode_mode 150 ) 151 152 # Set the channels to calibrate 153 # For EK60 this is all channels 154 222.6 MiB 0.0 MiB 1 self.chan_sel = self.echodata[self.ed_beam_group]["channel"] 155 222.6 MiB 0.0 MiB 1 beam = self.echodata[self.ed_beam_group] 156 157 # Convert env_params and cal_params if self.ecs_file exists 158 # Note a warning if thrown out in CalibrateBase.__init__ 159 # to let user know cal_params and env_params are ignored if ecs_file is provided 160 222.6 MiB 0.0 MiB 1 if self.ecs_file is not None: # also means self.ecs_dict != {} 161 ds_env_tmp, ds_cal_tmp, _ = ecs_ev2ep(self.ecs_dict, "EK60") 162 self.cal_params = ecs_ds2dict( 163 conform_channel_order(ds_cal_tmp, beam["frequency_nominal"]) 164 ) 165 self.env_params = ecs_ds2dict( 166 conform_channel_order(ds_env_tmp, beam["frequency_nominal"]) 167 ) 168 169 # Regardless of the source cal and env params, 170 # go through the same sanitization and organization process 171 229.0 MiB 6.4 MiB 2 self.env_params = get_env_params_EK( 172 222.6 MiB 0.0 MiB 1 sonar_type=self.sonar_type, 173 222.6 MiB 0.0 MiB 1 beam=self.echodata[self.ed_beam_group], 174 222.6 MiB 0.0 MiB 1 env=self.echodata["Environment"], 175 222.6 MiB 0.0 MiB 1 user_dict=self.env_params, 176 ) 177 280.5 MiB 51.5 MiB 2 self.cal_params = get_cal_params_EK( 178 229.0 MiB 0.0 MiB 1 waveform_mode=self.waveform_mode, 179 229.0 MiB 0.0 MiB 1 freq_center=beam["frequency_nominal"], 180 229.0 MiB 0.0 MiB 1 beam=beam, 181 229.0 MiB 0.0 MiB 1 vend=self.echodata["Vendor_specific"], 182 229.0 MiB 0.0 MiB 1 user_dict=self.cal_params, 183 229.0 MiB 0.0 MiB 1 sonar_type=self.sonar_type, 184 ) 185 186 # Compute range 187 929.8 MiB 929.8 MiB 1 self.compute_echo_range() Filename: /Users/lsetiawan/Repos/SSEC/echopype/echopype/calibrate/range.py Line # Mem usage Increment Occurrences Line Contents ============================================================= 99 280.5 MiB 280.5 MiB 1 def compute_range_EK( 100 echodata: EchoData, 101 env_params: Dict, 102 waveform_mode: str = "CW", 103 encode_mode: str = "power", 104 chan_sel=None, 105 ): 106 """ 107 Computes the range (``echo_range``) of EK backscatter data in meters. 108 109 Parameters 110 ---------- 111 echodata : EchoData 112 An EchoData object holding data from an AZFP echosounder 113 env_params : dict 114 A dictionary holding environmental parameters needed for computing range 115 See echopype.calibrate.env_params.get_env_params_EK 116 waveform_mode : {"CW", "BB"} 117 Type of transmit waveform. 118 Required only for data from the EK80 echosounder. 119 120 - `"CW"` for narrowband transmission, 121 returned echoes recorded either as complex or power/angle samples 122 - `"BB"` for broadband transmission, 123 returned echoes recorded as complex samples 124 125 encode_mode : {"complex", "power"} 126 Type of encoded data format. 127 Required only for data from the EK80 echosounder. 128 129 - `"complex"` for complex samples 130 - `"power"` for power/angle samples, only allowed when 131 the echosounder is configured for narrowband transmission 132 133 Returns 134 ------- 135 xr.DataArray 136 The range (``echo_range``) of the data in meters. 137 138 Notes 139 ----- 140 The EK80 echosounder can be configured to transmit 141 either broadband (``waveform_mode="BB"``) or narrowband (``waveform_mode="CW"``) signals. 142 When transmitting in broadband mode, the returned echoes must be 143 encoded as complex samples (``encode_mode="complex"``). 144 When transmitting in narrowband mode, the returned echoes can be encoded 145 either as complex samples (``encode_mode="complex"``) 146 or as power/angle combinations (``encode_mode="power"``) in a format 147 similar to those recorded by EK60 echosounders (the "power/angle" format). 148 """ 149 # sound_speed should exist already 150 280.5 MiB 0.0 MiB 1 if echodata.sonar_model in ("EK60", "ES70"): 151 280.5 MiB 0.0 MiB 1 ek_str = "EK60" 152 elif echodata.sonar_model in ("EK80", "ES80", "EA640"): 153 ek_str = "EK80" 154 else: 155 raise ValueError("The specified sonar_model is not supported!") 156 157 280.5 MiB 0.0 MiB 1 if "sound_speed" not in env_params: 158 raise RuntimeError( 159 "sounds_speed not included in env_params, " 160 f"use echopype.calibrate.env_params.get_env_params_{ek_str}() to compute env_params " 161 ) 162 else: 163 280.5 MiB 0.0 MiB 1 sound_speed = env_params["sound_speed"] 164 165 # Get the right Sonar/Beam_groupX group according to encode_mode 166 280.5 MiB 0.0 MiB 1 ed_beam_group = retrieve_correct_beam_group(echodata, waveform_mode, encode_mode) 167 280.5 MiB 0.0 MiB 1 beam = ( 168 280.5 MiB 0.0 MiB 1 echodata[ed_beam_group] 169 280.5 MiB 0.0 MiB 1 if chan_sel is None 170 else echodata[ed_beam_group].sel(channel=chan_sel) 171 ) 172 173 # Harmonize sound_speed time1 and Beam_groupX ping_time 174 280.5 MiB 0.0 MiB 2 sound_speed = harmonize_env_param_time( 175 280.5 MiB 0.0 MiB 1 p=sound_speed, 176 280.5 MiB 0.0 MiB 1 ping_time=beam.ping_time, 177 ) 178 179 # Range in meters, not modified for TVG compensation 180 1820.9 MiB 1540.4 MiB 1 range_meter = beam["range_sample"] * beam["sample_interval"] * sound_speed / 2 181 # make order of dims conform with the order of backscatter data 182 1822.0 MiB 1.0 MiB 1 range_meter = range_meter.transpose(*DIMENSION_ORDER) 183 184 # set entries with NaN backscatter data to NaN 185 1822.1 MiB 0.2 MiB 1 if "beam" in beam["backscatter_r"].dims: 186 # Drop beam because echo_range should not have a beam dimension 187 valid_idx = ~beam["backscatter_r"].isel(beam=0).drop("beam").isnull() 188 else: 189 6415.5 MiB 4593.4 MiB 1 valid_idx = ~beam["backscatter_r"].isnull() 190 1511.1 MiB -4904.3 MiB 1 range_meter = range_meter.where(valid_idx) 191 192 # remove time1 if exists as a coordinate 193 1511.5 MiB 0.3 MiB 1 if "time1" in range_meter.coords: 194 range_meter = range_meter.drop("time1") 195 196 # add name to facilitate xr.merge 197 1511.5 MiB 0.0 MiB 1 range_meter.name = "echo_range" 198 199 1511.5 MiB 0.0 MiB 1 return range_meter Filename: /Users/lsetiawan/Repos/SSEC/echopype/echopype/calibrate/calibrate_ek.py Line # Mem usage Increment Occurrences Line Contents ============================================================= 50 930.2 MiB 930.2 MiB 1 def _cal_power_samples(self, cal_type: str) -> xr.Dataset: 51 """Calibrate power data from EK60 and EK80. 52 53 Parameters 54 ---------- 55 cal_type: str 56 'Sv' for calculating volume backscattering strength, or 57 'TS' for calculating target strength 58 59 Returns 60 ------- 61 xr.Dataset 62 The calibrated dataset containing Sv or TS 63 """ 64 # Select source of backscatter data 65 933.4 MiB 3.1 MiB 1 beam = self.echodata[self.ed_beam_group] 66 67 # Derived params 68 946.3 MiB 12.9 MiB 1 wavelength = self.env_params["sound_speed"] / beam["frequency_nominal"] # wavelength 69 # range_meter = self.range_meter 70 71 # TVG compensation with modified range 72 946.3 MiB 0.0 MiB 1 sound_speed = self.env_params["sound_speed"] 73 946.3 MiB 0.0 MiB 1 absorption = self.env_params["sound_absorption"] 74 7130.0 MiB 7130.0 MiB 2 tvg_mod_range = range_mod_TVG_EK( 75 946.3 MiB 0.0 MiB 1 self.echodata, self.ed_beam_group, self.range_meter, sound_speed 76 ) 77 912.2 MiB -6217.8 MiB 1 tvg_mod_range = tvg_mod_range.where(tvg_mod_range > 0, np.nan) 78 79 652.9 MiB -259.4 MiB 1 spreading_loss = 20 * np.log10(tvg_mod_range) 80 6107.8 MiB 5454.9 MiB 1 absorption_loss = 2 * absorption * tvg_mod_range 81 82 6107.9 MiB 0.1 MiB 1 if cal_type == "Sv": 83 # Calc gain 84 6083.9 MiB -47.3 MiB 1 CSv = ( 85 6131.2 MiB -25.9 MiB 4 10 * np.log10(beam["transmit_power"]) 86 6125.3 MiB 2.0 MiB 1 + 2 * self.cal_params["gain_correction"] 87 6129.4 MiB 0.0 MiB 1 + self.cal_params["equivalent_beam_angle"] 88 6131.2 MiB -29.2 MiB 2 + 10 89 6131.2 MiB -32.5 MiB 2 * np.log10( 90 6138.5 MiB -15.1 MiB 4 wavelength**2 91 6134.9 MiB 0.0 MiB 1 * beam["transmit_duration_nominal"] 92 6138.5 MiB 0.0 MiB 1 * self.env_params["sound_speed"] 93 6116.4 MiB -22.1 MiB 1 / (32 * np.pi**2) 94 ) 95 ) 96 97 # Calibration and echo integration 98 704.2 MiB -5379.7 MiB 1 out = ( 99 6083.9 MiB -5384.3 MiB 5 beam["backscatter_r"] # has beam dim 100 6083.9 MiB 0.0 MiB 1 + spreading_loss 101 5978.0 MiB -105.9 MiB 1 + absorption_loss 102 2955.6 MiB -3128.4 MiB 1 - CSv 103 725.3 MiB -5358.6 MiB 1 - 2 * self.cal_params["sa_correction"] 104 ) 105 704.2 MiB 0.0 MiB 1 out.name = "Sv" 106 107 elif cal_type == "TS": 108 # Calc gain 109 CSp = ( 110 10 * np.log10(beam["transmit_power"]) 111 + 2 * self.cal_params["gain_correction"] 112 + 10 * np.log10(wavelength**2 / (16 * np.pi**2)) 113 ) 114 115 # Calibration and echo integration 116 out = beam["backscatter_r"] + spreading_loss * 2 + absorption_loss - CSp 117 out.name = "TS" 118 119 # Attach calculated range (with units meter) into data set 120 704.9 MiB 0.7 MiB 1 out = out.to_dataset() 121 708.0 MiB 3.1 MiB 1 out = out.merge(self.range_meter) 122 123 # Add frequency_nominal to data set 124 708.5 MiB 0.5 MiB 1 out["frequency_nominal"] = beam["frequency_nominal"] 125 126 # Add env and cal parameters 127 710.0 MiB 1.5 MiB 1 out = self._add_params_to_output(out) 128 129 # Remove time1 if exist as a coordinate 130 710.0 MiB 0.0 MiB 1 if "time1" in out.coords: 131 out = out.drop("time1") 132 133 710.0 MiB 0.0 MiB 1 return out Filename: /Users/lsetiawan/Repos/SSEC/echopype/echopype/calibrate/range.py Line # Mem usage Increment Occurrences Line Contents ============================================================= 202 946.4 MiB 946.4 MiB 1 def range_mod_TVG_EK( 203 echodata: EchoData, ed_beam_group: str, range_meter: xr.DataArray, sound_speed: xr.DataArray 204 ) -> xr.DataArray: 205 """ 206 Modify range for TVG calculation. 207 208 TVG correction factor changes depending when the echo recording starts 209 wrt when the transmit signal is sent out. 210 This depends on whether it is Ex60 or Ex80 style hardware 211 ref: https://github.com/CI-CMG/pyEcholab/blob/RHT-EK80-Svf/echolab2/instruments/EK80.py#L4297-L4308 # noqa 212 """ 213 214 946.4 MiB 0.0 MiB 2 def mod_Ex60(): 215 # 2-sample shift in the beginning 216 955.3 MiB 8.9 MiB 1 return 2 * beam["sample_interval"] * sound_speed / 2 # [frequency x range_sample] 217 218 946.4 MiB 0.0 MiB 1 def mod_Ex80(): 219 mod = sound_speed * beam["transmit_duration_nominal"] / 4 220 if isinstance(mod, xr.DataArray) and "time1" in mod.coords: 221 mod = mod.squeeze().drop("time1") 222 return mod 223 224 946.4 MiB 0.0 MiB 1 beam = echodata[ed_beam_group] 225 946.4 MiB 0.0 MiB 1 vend = echodata["Vendor_specific"] 226 227 # If EK60 228 946.4 MiB 0.0 MiB 1 if echodata.sonar_model in ["EK60", "ES70"]: 229 7129.7 MiB 6174.4 MiB 1 range_meter = range_meter - mod_Ex60() 230 231 # If EK80: 232 # - compute range first assuming all channels have Ex80 style hardware 233 # - change range for channels with Ex60 style hardware (GPT) 234 elif echodata.sonar_model in ["EK80", "ES80", "EA640"]: 235 range_meter = range_meter - mod_Ex80() 236 237 # Change range for all channels with GPT 238 if "GPT" in vend["transceiver_type"]: 239 ch_GPT = vend["transceiver_type"] == "GPT" 240 range_meter.loc[dict(channel=ch_GPT)] = range_meter.sel(channel=ch_GPT) - mod_Ex60() 241 242 7129.8 MiB 0.1 MiB 1 return range_meter ```

Analysis

echopype/calibrate/calibrate_ek.py::init

187    929.8 MiB    929.8 MiB           1           self.compute_echo_range()

We can see that during the instantiation of the EK60 Calibrator Class, compute_echo_range has the most increase in memory usage. In this instance, it's 929.8 MiB.

echopype/calibrate/range.py::compute_range_EK

180   1820.9 MiB   1540.4 MiB           1       range_meter = beam["range_sample"] * beam["sample_interval"] * sound_speed / 2

Digging into the compute_echo_range method of the EK60 Calibrator Class, compute_range_EK is actually being called. From the result above, the creation of range_meter caused quite a spike in memory usage of 1540.4 MiB.

A broadcasting operation is happening here as the shapes are:

189   6415.5 MiB   4593.4 MiB           1           valid_idx = ~beam["backscatter_r"].isnull()

Additionally, the operation of getting non-null values spiked the memory usage again! This time, about 4X increase.

190   1511.1 MiB  -4904.3 MiB           1       range_meter = range_meter.where(valid_idx)

This is then cleaned up by a where operation on the range_meter variable calculated above.

echopype/calibrate/calibrate_ek.py::_cal_power_samples

74   7130.0 MiB   7130.0 MiB           2           tvg_mod_range = range_mod_TVG_EK(
75    946.3 MiB      0.0 MiB           1               self.echodata, self.ed_beam_group, self.range_meter, sound_speed
76                                                 )
...
80   6107.8 MiB   5454.9 MiB           1           absorption_loss = 2 * absorption * tvg_mod_range

Once compute_range is finished, the process continues to calling _cal_power_samples to perform the Sv computation. As seen in the snippet above, range_mod_TVG_EK increased the memory usage 7130.0 MiB... that's around 7GB! Then another spike of 5454.9 MiB happens again during the computation above.

echopype/calibrate/range.py::range_mod_TVG_EK

229   7129.7 MiB   6174.4 MiB           1           range_meter = range_meter - mod_Ex60()

Digging into range_mod_TVG_EK function, we can see that the substraction b/w range_meter and mod_Ex60 function result blows up the memory here.

lsetiawan commented 8 months ago

Investigation

After some initial investigate, I found that the first computation of beam["range_sample"] * beam["sample_interval"] * sound_speed / 2 is really problematic. We can re-create a very simple example with just a few scaffolding:

from distributed import Client
import dask.array
import xarray as xr
import numpy as np

# Use 2 workers so each gets more memory
client = Client(n_workers=2)

# Same shape as the actual data
ch, pt, rs = 5, 46187, 3957
sample_interval = xr.DataArray(dask.array.ones((ch, pt)), name="sample_interval", coords={"ch": np.arange(ch), "pt": np.arange(pt)})
sound_speed = xr.DataArray(dask.array.ones((ch, pt)) * 1053, name="sound_speed", coords={"ch": np.arange(ch), "pt": np.arange(pt)})
range_sample = xr.DataArray(dask.array.from_array(np.arange(rs)), name="range_sample", coords={"rs": np.arange(rs)})
res = range_sample * sample_interval * sound_speed / 2
res.compute()

The above code WILL FAIL with similar error and memory spilling and blowup seen in the dask dashboard:

2023-11-06 17:17:59,276 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 11.84 GiB -- Worker memory limit: 16.00 GiB
2023-11-06 17:17:59,495 - distributed.worker.memory - WARNING - Worker is at 83% memory usage. Pausing worker.  Process memory: 13.36 GiB -- Worker memory limit: 16.00 GiB
2023-11-06 17:17:59,663 - distributed.worker.memory - WARNING - Worker is at 43% memory usage. Resuming worker. Process memory: 6.91 GiB -- Worker memory limit: 16.00 GiB

If we perform the same snippet of code with the following shape:

ch, pt, rs = 5, 20000, 2000

the computation will finish just fine.

ctuguinay commented 1 month ago

So the most obvious problem here comes from range_sample multiplied by either sound_speed or sample_interval. This creates an array that has memory approximately 3975 times bigger than that of both sample_interval and sound_speed.

Also, I suspect that there's some excessive broadcasting going on that may overload worker memory. I'm still looking into this.

ctuguinay commented 1 month ago

Using two workers doesn't seem to be helpful, since most of the memory will stick to just one worker here

Chunking range_sample into two equal chunks crashed my 8 CPU 30 GiB RAM VM

leewujung commented 1 month ago

I would suggest chunking along ping_time. range_sample is typically shorter for large dataset, since the depth setting usually does not get fiddled much, but ping_time can continue to grow.

Maybe look at arithmetic things that could simplify the ops would be helpful? like is it possible to avoid the expansion, etc.

ctuguinay commented 1 month ago

Yeah, I tried just chunking on ping_time, and that seemed to scale well, speed-wise, but there's a large subset of task outputs that are stored in RAM and this builds up over time. So it seems as though something towards the end of the task graph needs a lot of the task outputs to compute, meaning there's a bottleneck.

And yeah, I can also look for stuff that avoids expansion altogether.

leewujung commented 1 month ago

Yeah, separating out the steps and looking at the graphs built for each step would likely be useful for figuring out exactly where the bottleneck is -- though I think you're already doing it! :)

ctuguinay commented 1 month ago

I'll put the (potential) fix for this issue into a separate issue since the idea is extensive enough that it needs its own separate thing. To summarize, this issue is caused by the accumulation of task products that build up due to Dask waiting for every chunk to compute before setting the end array of res.compute(). To fix this accumulation problem, we can aim the computation at a Zarr store.

ctuguinay commented 1 day ago

Closed by merging of #1331