AlexeyPechnikov / pygmtsar

PyGMTSAR (Python InSAR): Powerful and Accessible Satellite Interferometry
http://insar.dev/
BSD 3-Clause "New" or "Revised" License
423 stars 93 forks source link

[Feature]: Consider using dask single-threaded scheduler in correlation function #86

Open SteffanDavies opened 8 months ago

SteffanDavies commented 8 months ago

This is much faster than looping in Python. The current method can take very long (30+ min) for large stacks (4000 pairs), have you experienced this?

Benchmark (DASK 1 Worker 16 Threads 128GB RAM)

SCHEDULER 30 150 (PAIRS)

distributed 1min 34s 7min 33s

multiprocessing 53 s 4min 9s

processes 49.6 s 3min 54s

single-threaded 3.7 s 18.4 s

sync 3.44 s 17.4 s

synchronous 3.68 s 17.4 s

threading 7.67 s 33.2 s

threads 7.73 s 38.3 s

import pandas as pd
import dask
from dask import delayed
import xarray as xr
import numpy as np

# convert pairs (list, array, dataframe) to 2D numpy array
pairs, dates = sbas.get_pairs(phase, dates=True)
pairs = pairs[['ref', 'rep']].astype(str).values

assert np.issubdtype(phase.dtype, np.complexfloating), 'ERROR: Phase should be complex-valued data.'
assert not np.issubdtype(intensity.dtype, np.complexfloating), 'ERROR: Intensity cannot be complex-valued data.'

def calculate_corr(pair):
    date1, date2 = pair
    corr = np.abs(phase.sel(pair=' '.join(pair)) / np.sqrt(intensity.sel(date=date1) * intensity.sel(date=date2)))
    corr = xr.where(corr < 0, 0, corr)
    corr = xr.where(corr > 1, 1, corr)
    return corr

# Convert the loop into a Dask delayed computation
delayed_corrs = [delayed(calculate_corr)(pair) for pair in pairs]

stack = dask.compute(delayed_corrs, scheduler="single-threaded")[0]

# Concatenate the results
corr = xr.concat(stack, dim='pair').rename('correlation')
AlexeyPechnikov commented 8 months ago

As I remember, I've shared example notebooks with the recommended solution for this case:

        with dask.config.set(scheduler='single-threaded'):
            ...compute()

And applying delayed() to large rasters has side effects, as it prevents the freeing of used resources as soon as possible. Does your approach work for a system with 4GB RAM and 4 cores? Or for one with 16GB RAM and 16 cores? You can try increasing the processing chunk size to greatly reduce the overhead of Dask computations.

SteffanDavies commented 8 months ago

As I remember, I've shared example notebooks with the recommended solution for this case:

        with dask.config.set(scheduler='single-threaded'):
            ...compute()

And applying delayed() to large rasters has side effects, as it prevents the freeing of used resources as soon as possible. Does your approach work for a system with 4GB RAM and 4 cores? Or for one with 16GB RAM and 16 cores? You can try increasing the processing chunk size to greatly reduce the overhead of Dask computations.

I don't have access to hardware with less than 32GB of RAM except for a Raspberry Pi in my drawer. That actually sounds like a fun experiment.

AlexeyPechnikov commented 8 months ago

Often, possessing a large amount of relatively slow memory is not advantageous. In my tests, Apple Silicon hosts with 2-4 times less RAM have managed to outperform Intel/AMD hosts. Indeed, processing speed can be significantly increased by eliminating Dask chunking, but at the cost of considerable scalability loss. By the way, on Linux, I've achieved the best results using different memory allocators, as detailed in the Dask documentation.

Additionally, using 1D unwrapping (PSI), there's no need to process a large area all at once. PyGMTSAR's turbulent noise removal is also 1D, and only stratified phase delay requires 2D processing. When not working in mountainous areas, it's relatively straightforward to divide your area. Alternatively, masking out unused pixels can also expedite the process. There are numerous strategies to enhance processing speed.