OSOceanAcoustics / echopype

Enabling interoperability and scalability in ocean sonar data analysis
https://echopype.readthedocs.io/
Apache License 2.0
99 stars 76 forks source link

`ep.open_raw` accumulates unamanged memory with Dask Cluster #1366

Open ctuguinay opened 4 months ago

ctuguinay commented 4 months ago

An issue to track the accumulation of memory when ep.open_raw is used with a Dask Cluster. This was seen during the Echodataflow Open Raw -> Sv Flow in the earlier stages of the Shimada ship-to-cloud pipeline.

The two ways to resolve this are by restarting/closing the cluster or not using a Dask Cluster at all.

Code to replicate the issue:

# Use maximum number of CPUs for Dask Client
client = Client(n_workers=os.cpu_count())

# Save Echodata objects locally
def open_and_save(raw_file, sonar_model, use_swap, save_path, storage_options):
    try:
        ed = ep.open_raw(
            raw_file=f's3://{raw_file}',
            sonar_model=sonar_model,
            use_swap=use_swap,
            storage_options=storage_options,
        )
        ed.to_zarr(save_path, overwrite=True, compute=True)
    except Exception as e:
        print("Error with Exception: ", e)

# Parse EK60 `.RAW` file and save to Zarr Store
open_and_save_futures = []
for raw_file_url in transect_file_paths:
    open_and_save_future = client.submit(
        open_and_save,
        raw_file=raw_file_url,
        sonar_model='ek60',
        use_swap=True,
        save_path=echodata_zarr_path,
        storage_options={'anon': True}
    )
    open_and_save_futures.append(open_and_save_future)
open_and_save_futures = client.gather(open_and_save_futures)
leewujung commented 4 months ago

This is interesting. I was just running a notebook on my local machine and did not observe the accumulation during open_raw, but observed the accumulation during combine_echodata. What's really interesting was that those accumulated unmanaged memory was later reclaimed when downstream tasks are running, so everything was ok with running the notebook toward the end. I also wonder if Dask version matters.