OSOceanAcoustics / echopype

Enabling interoperability and scalability in ocean sonar data analysis
https://echopype.readthedocs.io/
Apache License 2.0
89 stars 70 forks source link

Determining which Non-Uniform Depth Upsampling is Faster #1322

Closed ctuguinay closed 1 month ago

ctuguinay commented 1 month ago

Related to the merging of #1314, the upsampling operation, which assumes non-uniform depth, for impulse noise has two possible implementations that were put forth in the revisions of the PR:

Assignment via Reindex:

This option is what is currently in echopype.

    # Assign a depth bin index to each Sv depth value
    depth_bin_assignment = xr.DataArray(
        np.digitize(
            ds_Sv["depth"], [interval.left for interval in downsampled_Sv["depth_bins"].data]
        ),
        dims=["channel", "ping_time", "range_sample"],
    )

    # Initialize upsampled Sv
    upsampled_Sv = ds_Sv["Sv"].copy()

    # Iterate through all channels
    for channel_index in range(len(depth_bin_assignment["channel"])):
        # Iterate through all ping times
        for ping_time_index in range(len(depth_bin_assignment["ping_time"])):
            # Get unique range sample values along a single ping from the digitized depth array:
            # NOTE: The unique index corresponds to the first unique value's position which in
            # turn corresponds to the first range sample value contained in each depth bin.
            _, unique_range_sample_indices = np.unique(
                depth_bin_assignment.isel(channel=channel_index, ping_time=ping_time_index).data,
                return_index=True,
            )

            # Select a single ping downsampled Sv vector
            subset_downsampled_Sv = downsampled_Sv.isel(
                channel=channel_index, ping_time=ping_time_index
            )

            # Substitute depth bin coordinate in the downsampled Sv to be the range sample value
            # corresponding to the first element (lowest depth value) of each depth bin, and rename
            # `depth_bin` coordinate to `range_sample`.
            subset_downsampled_Sv = subset_downsampled_Sv.assign_coords(
                {"depth_bins": unique_range_sample_indices}
            ).rename({"depth_bins": "range_sample"})

            # Upsample via `reindex` `ffill`
            upsampled_Sv[dict(channel=channel_index, ping_time=ping_time_index)] = (
                subset_downsampled_Sv.reindex(
                    {"range_sample": ds_Sv["range_sample"]}, method="ffill"
                )
            )

Assignment via .apply_ufunc using upsample mapping:

def _upsample_using_mapping(downsampled_Sv: np.ndarray, upsample_mapping: np.ndarray) -> np.ndarray:
    """Upsample using downsampled Sv and upsample mapping."""
    upsampled_Sv = np.zeros_like(upsample_mapping, dtype=np.float64)
    for upsampled_index, downsampled_index in enumerate(upsample_mapping):
        upsampled_Sv[upsampled_index] = downsampled_Sv[downsampled_index]
    return upsampled_Sv

........
from `downsample_upsample_along_depth`
........
    # Create upsample mapping
    left_bin_values = [interval.left for interval in downsampled_Sv["depth_bins"].data]
    upsample_mapping = xr.DataArray(
        # Digitize denotes a value belonging in the first bin as being assigned to index 1 and prior
        # to first bin as index 0. Since we want to create an index to index mapping between depth
        # bins and the original Sv `depth`, this default behavior should be offset to the left,
        # i.e a -1 applied to each value in the digitized array.
        # Additionally, this subtraction will never result in -1 since `depth` will never contain
        # any values prior to the first bin.
        np.digitize(ds_Sv["depth"], left_bin_values) - 1,
        dims=["channel", "ping_time", "range_sample"],
    )

    # Upsample the downsampled Sv
    upsampled_Sv = xr.apply_ufunc(
        _upsample_using_mapping,
        # Need to compute arrays since indexing fails when delayed:
        downsampled_Sv.compute(),
        upsample_mapping.compute(),
        input_core_dims=[["depth_bins"], ["range_sample"]],
        output_core_dims=[["range_sample"]],
        exclude_dims=set(["depth_bins", "range_sample"]),
        dask="parallelized",
        vectorize=True,
        output_dtypes=[np.float64],
    )
    # Set range sample coords
    upsampled_Sv = upsampled_Sv.assign_coords(range_sample=ds_Sv["range_sample"])

This issue is to determine which of these is faster. Potentially, the assignment via reindex could also be done using apply_ufunc using Pandas reindex_like, and if this can be used in the 1st option, then the 1st option would most likely beat the 2nd option.

ctuguinay commented 1 month ago

Closing this because it is a non-issue. I was thinking about this over the weekend, and regardless of which one is faster, we should develop upon the option that is the 'most sound'. There are 3 problems I already see for the second option:

  1. The second option must use apply_ufunc with exclude_dimensions, meaning broadcasting and alignment cannot be used here, which is one of the ways that apply_ufunc makes operations fast (and potentially take up less RAM).
  2. Single-item assignment via a for-loop is incredibly slow when Dask delayed, since each assignment is a task added to the task graph, and each task has its own overhead.
  3. Knowing that the upsample mapping has repeating values, why should we use a for-loop assignment when instead we can use a method, (e.g. reindexing using ffill), that can exploit this repeating property? A for-loop assignment would be more helpful if we didn't know the assignment creates repeating values.