scipp / essimaging

Imaging data reduction for the European Spallation Source
BSD 3-Clause "New" or "Revised" License
0 stars 1 forks source link

Investigate file locking issue on VISA. #47

Closed YooSunYoung closed 1 day ago

YooSunYoung commented 2 days ago

We tried using locking=False to get away from the permission error in the read-only file system, mounted on VISA.

It raises an error saying that the flag doesn't match.

I don't understand why...

https://github.com/scipp/essimaging/blob/42bf7bf551d7cd2e9b47d8c1f974fb5f782a2826/src/ess/imaging/io.py#L81-L99

Error Message

Code Snippet to Reproduce the Error from ess.imaging.data import get_ymir_images_path from ess.imaging.io import FilePath, FileLock from ess.imaging.normalize import NormalizedSampleImages from ess.imaging.workflow import ( YmirWorkflow, ImageDetectorName, RotationMotionSensorName, ) wf = YmirWorkflow() wf[FilePath] = ( get_ymir_images_path() ) # Replace with the path to your images in nexus file. wf[FilePath] = "/ess/data/ymir/2024/597001/raw/597001_00011360.hdf" wf[FileLock] = False wf[ImageDetectorName] = 'orca' wf[RotationMotionSensorName] = 'motion_cabinet_2' normalized = wf.compute(NormalizedSampleImages) normalized
Full Error Message --------------------------------------------------------------------------- OSError Traceback (most recent call last) Cell In[3], line 18 16 wf[ImageDetectorName] = 'orca' 17 wf[RotationMotionSensorName] = 'motion_cabinet_2' ---> 18 normalized = wf.compute(NormalizedSampleImages) 19 normalized File ~/.local/lib/python3.12/site-packages/sciline/pipeline.py:90, in Pipeline.compute(self, tp, **kwargs) 76 def compute(self, tp: type | Iterable[type] | UnionType, **kwargs: Any) -> Any: 77 """ 78 Compute result for the given keys. 79 (...) 88 Keyword arguments passed to the ``.get()`` method. 89 """ ---> 90 return self.get(tp, **kwargs).compute() File ~/.local/lib/python3.12/site-packages/sciline/task_graph.py:113, in TaskGraph.compute(self, targets) 111 return dict(zip(targets, results, strict=True)) 112 else: --> 113 return self._scheduler.get(self._graph, [targets])[0] File ~/.local/lib/python3.12/site-packages/sciline/scheduler.py:105, in DaskScheduler.get(self, graph, keys) 92 dsk = { 93 _to_dask_key(tp): ( 94 apply, (...) 102 for tp, provider in graph.items() 103 } 104 try: --> 105 return self._dask_get(dsk, list(map(_to_dask_key, keys))) 106 except RuntimeError as e: 107 if str(e).startswith("Cycle detected"): File ~/.local/lib/python3.12/site-packages/dask/threaded.py:91, in get(dsk, keys, cache, num_workers, pool, **kwargs) 88 elif isinstance(pool, multiprocessing.pool.Pool): 89 pool = MultiprocessingPoolExecutor(pool) ---> 91 results = get_async( 92 pool.submit, 93 pool._max_workers, 94 dsk, 95 keys, 96 cache=cache, 97 get_id=_thread_get_id, 98 pack_exception=pack_exception, 99 **kwargs, 100 ) 102 # Cleanup pools associated to dead threads 103 with pools_lock: File ~/.local/lib/python3.12/site-packages/dask/local.py:513, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 511 _execute_task(task, data) # Re-execute locally 512 else: --> 513 raise_exception(exc, tb) 514 res, worker_id = loads(res_info) 515 state["cache"][key] = res File ~/.local/lib/python3.12/site-packages/dask/local.py:321, in reraise(exc, tb) 319 if exc.__traceback__ is not tb: 320 raise exc.with_traceback(tb) --> 321 raise exc File ~/.local/lib/python3.12/site-packages/dask/local.py:226, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 224 try: 225 task, data = loads(task_info) --> 226 result = _execute_task(task, data) 227 id = get_id() 228 result = dumps((result, id)) File ~/.local/lib/python3.12/site-packages/dask/core.py:133, in _execute_task(arg, cache, dsk) 129 func, args = arg[0], arg[1:] 130 # Note: Don't assign the subtask results to a variable. numpy detects 131 # temporaries by their reference count and can execute certain 132 # operations in-place. --> 133 return func(*(_execute_task(a, cache) for a in args)) 134 elif not ishashable(arg): 135 return arg File ~/.local/lib/python3.12/site-packages/dask/utils.py:80, in apply(func, args, kwargs) 78 return func(*args, **kwargs) 79 else: ---> 80 return func(*args) File ~/Desktop/myProposals/essimaging/src/ess/imaging/io.py:153, in load_nexus_rotation_logs(file_path, motion_sensor_name) 148 def load_nexus_rotation_logs( 149 file_path: FilePath, 150 motion_sensor_name: RotationMotionSensorName, 151 ) -> RotationLogs: 152 log_path = f"entry/instrument/{motion_sensor_name}/rotation_stage_readback" --> 153 with snx.File(file_path, mode="r") as f: 154 return RotationLogs(f[log_path][()]['value']) File ~/.local/lib/python3.12/site-packages/scippnexus/file.py:37, in File.__init__(self, definitions, *args, **kwargs) 35 if definitions is DefaultDefinitions: 36 definitions = base_definitions() ---> 37 self._file = h5py.File(*args, **kwargs) 38 super().__init__(self._file, definitions=definitions) File /opt/miniconda/lib/python3.12/site-packages/h5py/_hl/files.py:562, in File.__init__(self, name, mode, driver, libver, userblock_size, swmr, rdcc_nslots, rdcc_nbytes, rdcc_w0, track_order, fs_strategy, fs_persist, fs_threshold, fs_page_size, page_buf_size, min_meta_keep, min_raw_keep, locking, alignment_threshold, alignment_interval, meta_block_size, **kwds) 553 fapl = make_fapl(driver, libver, rdcc_nslots, rdcc_nbytes, rdcc_w0, 554 locking, page_buf_size, min_meta_keep, min_raw_keep, 555 alignment_threshold=alignment_threshold, 556 alignment_interval=alignment_interval, 557 meta_block_size=meta_block_size, 558 **kwds) 559 fcpl = make_fcpl(track_order=track_order, fs_strategy=fs_strategy, 560 fs_persist=fs_persist, fs_threshold=fs_threshold, 561 fs_page_size=fs_page_size) --> 562 fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr) 564 if isinstance(libver, tuple): 565 self._libver = libver File /opt/miniconda/lib/python3.12/site-packages/h5py/_hl/files.py:235, in make_fid(name, mode, userblock_size, fapl, fcpl, swmr) 233 if swmr and swmr_support: 234 flags |= h5f.ACC_SWMR_READ --> 235 fid = h5f.open(name, flags, fapl=fapl) 236 elif mode == 'r+': 237 fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl) File h5py/_objects.pyx:54, in h5py._objects.with_phil.wrapper() File h5py/_objects.pyx:55, in h5py._objects.with_phil.wrapper() File h5py/h5f.pyx:102, in h5py.h5f.open() OSError: Unable to synchronously open file (file locking flag values don't match)
jl-wynen commented 2 days ago

Can you paste the error message here?

YooSunYoung commented 2 days ago

I updated the description with the error message

jl-wynen commented 2 days ago

This error message comes from a different call to snx.File than the code at the top. And this call does not have locking=False. Are you opening the same file multiple times?

YooSunYoung commented 1 day ago

This error message comes from a different call to snx.File than the code at the top. And this call does not have locking=False. Are you opening the same file multiple times?

You're right... I saw the same error when we tried it in the VISA so I thought it was the same error but the flag is different. I'll update it with the right one.

It is opening files multiple places but only with a context manager so it should close it every time...?

jl-wynen commented 1 day ago

It is opening files multiple places but only with a context manager so it should close it every time...?

Unless you open one context within another.

YooSunYoung commented 1 day ago

It is opening files multiple places but only with a context manager so it should close it every time...?

Unless you open one context within another.

I just checked all the file opening part, which is

I made sure they always exit before opening another one.... I think...?

YooSunYoung commented 1 day ago

And I updated the error message and the code snippet to reproduce the error message.

jl-wynen commented 1 day ago

There is still a line that doesn't set locking=False in the backtrace:

--> 153 with snx.File(file_path, mode="r") as f:

jl-wynen commented 1 day ago

This is running with dask. So does it use multi threading and multiple threads access a file at the same time?

YooSunYoung commented 1 day ago

There is still a line that doesn't set locking=False in the backtrace:

--> 153 with snx.File(file_path, mode="r") as f:

Ah right. Thanks

This is running with dask. So does it use multi threading and multiple threads access a file at the same time?

Hmmmm maybe...?

YooSunYoung commented 1 day ago

@jl-wynen It was... because I missed one of the loader to pass the locking argument... 🤡.......

I think it proves that we should rather use environment variable than passing it to all the loaders...?

jl-wynen commented 1 day ago

I think it shows the opposite. As Christian said, the locking feature exists to prevent files from being corrupted. If we open files multiple times concurrently and (accidentally) modify a file, we run this exact risk without locking. When loading a file from ESS readonly storage, that is not a risk. But when the file is on a User's machine or some other, writable filesystem, then we do risk corrupting files irreversibly.

So I think we should be careful and only set locking=False if and when we have to.

I don't have a good solution at the moment. Maybe we can do someething along the lines of NeXusFileSpec to encode flags. And then we could maybe detect whether locking=False is needed and, importantly, safe in a particular case.

YooSunYoung commented 1 day ago

Hmm okay.

First, I'll move this issue to the ~scippnexus instead.~ Sorry I meant essreduce.