AlexeyPechnikov / pygmtsar

PyGMTSAR (Python InSAR): Powerful and Accessible Satellite Interferometry
http://insar.dev/
BSD 3-Clause "New" or "Revised" License
428 stars 96 forks source link

[Bug]: File path issues using Dask Distributed system in sbas.geocode() #124

Open SteffanDavies opened 6 months ago

SteffanDavies commented 6 months ago

I am attempting to set up a distributed network of workers for Dask computation.

Host A has a shared NFS volume with Host B where the Pygmtsar project is located.

When running sbas.compute_geocode(1), it is looking for the file in the path '/home/steffan/raw_sarez2017_desc/trans_1.grd', however, this file is actually located in '/export/dask/raw_sarez2017_desc' which is mounted on both Hosts under the folder '/mnt/dask/raw_sarez2017_desc'


KeyError Traceback (most recent call last) File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/file_manager.py:211, in _acquire_with_cache_info() 210 try: --> 211 file = self._cache[self._key] 212 except KeyError:

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/lru_cache.py:56, in getitem() 55 with self._lock: ---> 56 value = self._cache[key] 57 self._cache.move_to_end(key)

KeyError: [<class 'h5netcdf.core.File'>, ('/home/steffan/raw_sarez2017_desc/trans_1.grd',), 'r', (('decode_vlen_strings', True), ('driver', None), ('invalid_netcdf', None)), '271bb14d-1cc3-4027-a320-f0a321e681ca']

During handling of the above exception, another exception occurred:

FileNotFoundError Traceback (most recent call last) Cell In[33], line 2 1 # use the original Sentinel-1 resolution (1 pixel spacing) ----> 2 sbas.compute_geocode(1)

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/pygmtsar/Stack_geocode.py:46, in Stack_geocode.compute_geocode(self, coarsen) 43 warnings.filterwarnings('ignore', module='dask') 44 warnings.filterwarnings('ignore', module='dask.core') ---> 46 self.compute_trans(coarsen=coarsen) 47 self.compute_trans_inv(coarsen=coarsen) 48 self.compute_satellite_look_vector()

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/pygmtsar/Stack_trans.py:220, in Stack_trans.compute_trans(self, coarsen, dem, interactive) 218 filenames = [fullname(index[0]) for index in enumerate(np.ndindex(chunks[0], chunks[1]))] 219 # re-save all the chunk NetCDF files as single NetCDF file --> 220 trans = xr.open_mfdataset( 221 np.asarray(filenames).reshape((chunks[0], chunks[1])).tolist(), 222 engine=self.netcdf_engine, 223 chunks=self.chunksize, 224 parallel=True, 225 concat_dim=['lat','lon'], 226 combine='nested' 227 ) 228 # fix geographic coordinates 229 #print ('lats', np.diff(lats)[:10]) 230 #print ('trans.lat', np.diff(trans.lat)[10]) 231 # add target radar coordinate grid for the user defined spacing (coarsen) 232 azis, rngs = self.define_trans_grid(coarsen)

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/api.py:1033, in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, data_vars, coords, combine, parallel, join, attrs_file, combine_attrs, **kwargs) 1028 datasets = [preprocess(ds) for ds in datasets] 1030 if parallel: 1031 # calling compute here will return the datasets/file_objs lists, 1032 # the underlying datasets will still be stored as dask arrays -> 1033 datasets, closers = dask.compute(datasets, closers) 1035 # Combine all datasets, closing them in case of a ValueError 1036 try:

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, args, kwargs) 658 postcomputes.append(x.__dask_postcompute__()) 660 with shorten_traceback(): --> 661 results = schedule(dsk, keys, kwargs) 663 return repack([f(r, a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/api.py:572, in open_dataset() 560 decoders = _resolve_decoders_kwargs( 561 decode_cf, 562 open_backend_dataset_parameters=backend.open_dataset_parameters, (...) 568 decode_coords=decode_coords, 569 ) 571 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None) --> 572 backend_ds = backend.open_dataset( 573 filename_or_obj, 574 drop_variables=drop_variables, 575 decoders, 576 kwargs, 577 ) 578 ds = _dataset_from_backend_dataset( 579 backend_ds, 580 filename_or_obj, (...) 590 **kwargs, 591 ) 592 return ds

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:400, in open_dataset() 379 def open_dataset( # type: ignore[override] # allow LSP violation, not supporting **kwargs 380 self, 381 filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore, (...) 397 driver_kwds=None, 398 ) -> Dataset: 399 filename_or_obj = _normalize_path(filename_or_obj) --> 400 store = H5NetCDFStore.open( 401 filename_or_obj, 402 format=format, 403 group=group, 404 lock=lock, 405 invalid_netcdf=invalid_netcdf, 406 phony_dims=phony_dims, 407 decode_vlen_strings=decode_vlen_strings, 408 driver=driver, 409 driver_kwds=driver_kwds, 410 ) 412 store_entrypoint = StoreBackendEntrypoint() 414 ds = store_entrypoint.open_dataset( 415 store, 416 mask_and_scale=mask_and_scale, (...) 422 decode_timedelta=decode_timedelta, 423 )

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:180, in open() 177 lock = combine_locks([HDF5_LOCK, get_write_lock(filename)]) 179 manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) --> 180 return cls(manager, group=group, mode=mode, lock=lock, autoclose=autoclose)

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:126, in init() 123 self.format = None 124 # todo: utilizing find_root_and_group seems a bit clunky 125 # making filename available on h5netcdf.Group seems better --> 126 self._filename = find_root_and_group(self.ds)[0].filename 127 self.is_remote = is_remote_uri(self._filename) 128 self.lock = ensure_lock(lock)

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:191, in ds() 189 @property 190 def ds(self): --> 191 return self._acquire()

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:183, in _acquire() 182 def _acquire(self, needs_lock=True): --> 183 with self._manager.acquire_context(needs_lock) as root: 184 ds = _nc4_require_group( 185 root, self._group, self._mode, create_group=_h5netcdf_create_group 186 ) 187 return ds

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/contextlib.py:135, in enter() 133 del self.args, self.kwds, self.func 134 try: --> 135 return next(self.gen) 136 except StopIteration: 137 raise RuntimeError("generator didn't yield") from None

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/file_manager.py:199, in acquire_context() 196 @contextlib.contextmanager 197 def acquire_context(self, needs_lock=True): 198 """Context manager for acquiring a file.""" --> 199 file, cached = self._acquire_with_cache_info(needs_lock) 200 try: 201 yield file

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/file_manager.py:217, in _acquire_with_cache_info() 215 kwargs = kwargs.copy() 216 kwargs["mode"] = self._mode --> 217 file = self._opener(*self._args, **kwargs) 218 if self._mode == "w": 219 # ensure file doesn't get overridden when opened again 220 self._mode = "a"

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/h5netcdf/core.py:1054, in init() 1052 self._preexisting_file = os.path.exists(path) and mode != "w" 1053 self._h5py = h5py -> 1054 self._h5file = self._h5py.File( 1055 path, mode, track_order=track_order, **kwargs 1056 ) 1057 else: # file-like object 1058 self._preexisting_file = mode in {"r", "r+", "a"}

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/h5py/_hl/files.py:562, in init() 553 fapl = make_fapl(driver, libver, rdcc_nslots, rdcc_nbytes, rdcc_w0, 554 locking, page_buf_size, min_meta_keep, min_raw_keep, 555 alignment_threshold=alignment_threshold, 556 alignment_interval=alignment_interval, 557 meta_block_size=meta_block_size, 558 **kwds) 559 fcpl = make_fcpl(track_order=track_order, fs_strategy=fs_strategy, 560 fs_persist=fs_persist, fs_threshold=fs_threshold, 561 fs_page_size=fs_page_size) --> 562 fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr) 564 if isinstance(libver, tuple): 565 self._libver = libver

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/h5py/_hl/files.py:235, in make_fid() 233 if swmr and swmr_support: 234 flags |= h5f.ACC_SWMR_READ --> 235 fid = h5f.open(name, flags, fapl=fapl) 236 elif mode == 'r+': 237 fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)

File h5py/_objects.pyx:54, in h5py._objects.with_phil.wrapper()

File h5py/_objects.pyx:55, in h5py._objects.with_phil.wrapper()

File h5py/h5f.pyx:102, in h5py.h5f.open()

FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = '/home/steffan/raw_sarez2017_desc/trans_1.grd', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

AlexeyPechnikov commented 6 months ago

Path management for distributed computing can be tricky. One straightforward solution is to create a symlink from '/home/steffan/raw_sarez2017_desc/' to '/export/dask/raw_sarez2017_desc'.

SteffanDavies commented 6 months ago

Your suggestion solves the pathing issue. However, sbas.compute_ps() throws the following error, which is probably related to simultaneous access:

use the only selected dates for the pixels stability analysis

sbas.compute_ps()

KeyError Traceback (most recent call last) File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/file_manager.py:211, in _acquire_with_cache_info() 210 try: --> 211 file = self._cache[self._key] 212 except KeyError:

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/lru_cache.py:56, in getitem() 55 with self._lock: ---> 56 value = self._cache[key] 57 self._cache.move_to_end(key)

KeyError: [<class 'h5netcdf.core.File'>, ('/home/steffan/raw_sarez2017_desc/S1_20170803_ALL_F2.grd',), 'r', (('decode_vlen_strings', True), ('driver', None), ('invalid_netcdf', None)), '5288aa0f-641f-4616-9ae9-5673fbd67445']

During handling of the above exception, another exception occurred:

OSError Traceback (most recent call last) Cell In[39], line 2 1 # use the only selected dates for the pixels stability analysis ----> 2 sbas.compute_ps()

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/pygmtsar/Stack_ps.py:41, in Stack_ps.compute_ps(self, dates, data, name, interactive) 37 warnings.filterwarnings('ignore', module='dask.core') 39 if isinstance(data, str) and data == 'auto': 40 # open SLC data as real intensities ---> 41 data = np.square(np.abs(self.open_data(dates=dates))) 43 # normalize image amplitudes (intensities) 44 tqdm_dask(mean := dask.persist(data.mean(dim=['y','x'])), desc='Intensity Normalization')

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/pygmtsar/IO.py:267, in IO.open_data(self, dates, scale, debug) 265 filenames = [self.PRM(date).filename[:-4] + '.grd' for date in dates] 266 #print ('filenames', filenames) --> 267 ds = xr.open_mfdataset( 268 filenames, 269 engine=self.netcdf_engine, 270 chunks=self.chunksize, 271 parallel=True, 272 concat_dim='date', 273 combine='nested' 274 ).assign(date=pd.to_datetime(dates)).rename({'a': 'y', 'r': 'x'}) 275 if scale is None: 276 # there is no complex int16 datatype, so return two variables for real and imag parts 277 return ds

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/api.py:1033, in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, data_vars, coords, combine, parallel, join, attrs_file, combine_attrs, **kwargs) 1028 datasets = [preprocess(ds) for ds in datasets] 1030 if parallel: 1031 # calling compute here will return the datasets/file_objs lists, 1032 # the underlying datasets will still be stored as dask arrays -> 1033 datasets, closers = dask.compute(datasets, closers) 1035 # Combine all datasets, closing them in case of a ValueError 1036 try:

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, args, kwargs) 658 postcomputes.append(x.__dask_postcompute__()) 660 with shorten_traceback(): --> 661 results = schedule(dsk, keys, kwargs) 663 return repack([f(r, a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/api.py:572, in open_dataset() 560 decoders = _resolve_decoders_kwargs( 561 decode_cf, 562 open_backend_dataset_parameters=backend.open_dataset_parameters, (...) 568 decode_coords=decode_coords, 569 ) 571 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None) --> 572 backend_ds = backend.open_dataset( 573 filename_or_obj, 574 drop_variables=drop_variables, 575 decoders, 576 kwargs, 577 ) 578 ds = _dataset_from_backend_dataset( 579 backend_ds, 580 filename_or_obj, (...) 590 **kwargs, 591 ) 592 return ds

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:400, in open_dataset() 379 def open_dataset( # type: ignore[override] # allow LSP violation, not supporting **kwargs 380 self, 381 filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore, (...) 397 driver_kwds=None, 398 ) -> Dataset: 399 filename_or_obj = _normalize_path(filename_or_obj) --> 400 store = H5NetCDFStore.open( 401 filename_or_obj, 402 format=format, 403 group=group, 404 lock=lock, 405 invalid_netcdf=invalid_netcdf, 406 phony_dims=phony_dims, 407 decode_vlen_strings=decode_vlen_strings, 408 driver=driver, 409 driver_kwds=driver_kwds, 410 ) 412 store_entrypoint = StoreBackendEntrypoint() 414 ds = store_entrypoint.open_dataset( 415 store, 416 mask_and_scale=mask_and_scale, (...) 422 decode_timedelta=decode_timedelta, 423 )

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:180, in open() 177 lock = combine_locks([HDF5_LOCK, get_write_lock(filename)]) 179 manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs) --> 180 return cls(manager, group=group, mode=mode, lock=lock, autoclose=autoclose)

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:126, in init() 123 self.format = None 124 # todo: utilizing find_root_and_group seems a bit clunky 125 # making filename available on h5netcdf.Group seems better --> 126 self._filename = find_root_and_group(self.ds)[0].filename 127 self.is_remote = is_remote_uri(self._filename) 128 self.lock = ensure_lock(lock)

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:191, in ds() 189 @property 190 def ds(self): --> 191 return self._acquire()

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/h5netcdf_.py:183, in _acquire() 182 def _acquire(self, needs_lock=True): --> 183 with self._manager.acquire_context(needs_lock) as root: 184 ds = _nc4_require_group( 185 root, self._group, self._mode, create_group=_h5netcdf_create_group 186 ) 187 return ds

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/contextlib.py:135, in enter() 133 del self.args, self.kwds, self.func 134 try: --> 135 return next(self.gen) 136 except StopIteration: 137 raise RuntimeError("generator didn't yield") from None

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/file_manager.py:199, in acquire_context() 196 @contextlib.contextmanager 197 def acquire_context(self, needs_lock=True): 198 """Context manager for acquiring a file.""" --> 199 file, cached = self._acquire_with_cache_info(needs_lock) 200 try: 201 yield file

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/xarray/backends/file_manager.py:217, in _acquire_with_cache_info() 215 kwargs = kwargs.copy() 216 kwargs["mode"] = self._mode --> 217 file = self._opener(*self._args, **kwargs) 218 if self._mode == "w": 219 # ensure file doesn't get overridden when opened again 220 self._mode = "a"

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/h5netcdf/core.py:1054, in init() 1052 self._preexisting_file = os.path.exists(path) and mode != "w" 1053 self._h5py = h5py -> 1054 self._h5file = self._h5py.File( 1055 path, mode, track_order=track_order, **kwargs 1056 ) 1057 else: # file-like object 1058 self._preexisting_file = mode in {"r", "r+", "a"}

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/h5py/_hl/files.py:562, in init() 553 fapl = make_fapl(driver, libver, rdcc_nslots, rdcc_nbytes, rdcc_w0, 554 locking, page_buf_size, min_meta_keep, min_raw_keep, 555 alignment_threshold=alignment_threshold, 556 alignment_interval=alignment_interval, 557 meta_block_size=meta_block_size, 558 **kwds) 559 fcpl = make_fcpl(track_order=track_order, fs_strategy=fs_strategy, 560 fs_persist=fs_persist, fs_threshold=fs_threshold, 561 fs_page_size=fs_page_size) --> 562 fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr) 564 if isinstance(libver, tuple): 565 self._libver = libver

File ~/miniconda3/envs/pygmtsar2/lib/python3.10/site-packages/h5py/_hl/files.py:235, in make_fid() 233 if swmr and swmr_support: 234 flags |= h5f.ACC_SWMR_READ --> 235 fid = h5f.open(name, flags, fapl=fapl) 236 elif mode == 'r+': 237 fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)

File h5py/_objects.pyx:54, in h5py._objects.with_phil.wrapper()

File h5py/_objects.pyx:55, in h5py._objects.with_phil.wrapper()

File h5py/h5f.pyx:102, in h5py.h5f.open()

OSError: [Errno 5] Unable to synchronously open file (file read failed: time = Tue Apr 30 16:43:16 2024 , filename = '/home/steffan/raw_sarez2017_desc/S1_20170803_ALL_F2.grd', file descriptor = 64, errno = 5, error message = 'Input/output error', buf = 0x7217f24df4b8, total read size = 8, bytes this sub-read = 8, bytes actually read = 18446744073709551615, offset = 0)

SteffanDavies commented 6 months ago

The same for computing interferograms:

OSError: [Errno 5] Can't synchronously read data (file read failed: time = Tue Apr 30 18:12:40 2024 , filename = '/home/steffan/raw_sarez2017_desc/S1_20170511_ALL_F2.grd', file descriptor = 18, errno = 5, error message = 'Input/output error', buf = 0x757e7079f720, total read size = 40136, bytes this sub-read = 40136, bytes actually read = 18446744073709551615, offset = 0)

AlexeyPechnikov commented 6 months ago

It seems that you have not configured your Dask cluster for parallel access to the network-shared data.

SteffanDavies commented 6 months ago

It seems that you have not configured your Dask cluster for parallel access to the network-shared data.

Do you have any idea what options they might be? I have looked at chat gpt but found no answer.

AlexeyPechnikov commented 6 months ago

There are many ways to begin; you could start here: https://docs.dask.org/en/stable/deploying.html

SteffanDavies commented 6 months ago

There are many ways to begin; you could start here: https://docs.dask.org/en/stable/deploying.html

Yes, I followed the dask docs for setting up the cli scheduler on A and workers on A and B. There is no mention of any parallel network options though. I even tried mounting the data from a NAS NFS folder but the problem persists. The data is definitely accessible from both machines. And it's only the machine with workers not running the notebook that has a problem. (I can work off NFS as long as the machine running the notebook is also running the workers.)

Have you tried running pygmtsar functions on a distributed system that isn't localcluster?

AlexeyPechnikov commented 6 months ago

I checked it but not the recent versions. Pay attention, most of the functions can return lazy results instead of saving them internally and you can manage the disk access in any way. Others can be running on a single cluster node in case you suffer from the issues. But it’s better to configure Dask cluster properly to access your shared dataset (while it can be platform specific and sometimes tricky due to netcdf and dask and xarray library versions incompatibilities).