SpikeInterface / spikeinterface

A Python-based module for creating flexible and robust spike sorting pipelines.
https://spikeinterface.readthedocs.io
MIT License
517 stars 184 forks source link

Cannot save `NwbRecordingExtractor` with multiple jobs #969

Closed khl02007 closed 2 years ago

khl02007 commented 2 years ago

When I try to save an NwbRecordingExtractor (after channel and frame slicing) I get the following error:

write_binary_recording with n_jobs = 4 and chunk_size = 29959
Exception in initializer:
Traceback (most recent call last):
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/concurrent/futures/process.py", line 226, in _process_worker
    initializer(*initargs)
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/job_tools.py", line 328, in worker_initializer
    _worker_ctx = init_func(*init_args)
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/core_tools.py", line 191, in _init_binary_worker
    worker_ctx['recording'] = load_extractor(recording)
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/base.py", line 897, in load_extractor
    return BaseExtractor.from_dict(file_or_folder_or_dict, base_folder=base_folder)
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/base.py", line 365, in from_dict
    extractor = _load_extractor_from_dict(d)
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/base.py", line 831, in _load_extractor_from_dict
    kwargs[k] = [_load_extractor_from_dict(e) for e in v]
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/base.py", line 831, in <listcomp>
    kwargs[k] = [_load_extractor_from_dict(e) for e in v]
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/base.py", line 825, in _load_extractor_from_dict
    kwargs[k] = _load_extractor_from_dict(v)
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/base.py", line 825, in _load_extractor_from_dict
    kwargs[k] = _load_extractor_from_dict(v)
  File "/home/kyu/repos/spikeinterface/spikeinterface/core/base.py", line 842, in _load_extractor_from_dict
    extractor = cls(**kwargs)
  File "/home/kyu/repos/spikeinterface/spikeinterface/extractors/nwbextractors.py", line 106, in __init__
    self.io = NWBHDF5IO(self._file_path, mode='r', load_namespaces=True, driver=driver)
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/hdmf/utils.py", line 645, in func_call
    return func(args[0], **pargs)
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/pynwb/__init__.py", line 230, in __init__
    super(NWBHDF5IO, self).load_namespaces(tm, path, file=file_obj, driver=driver)
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/hdmf/utils.py", line 645, in func_call
    return func(args[0], **pargs)
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/hdmf/backends/hdf5/h5tools.py", line 153, in load_namespaces
    return cls.__load_namespaces(namespace_catalog, namespaces, open_file_obj)
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/hdmf/backends/hdf5/h5tools.py", line 177, in __load_namespaces
    for spec_ns in reader.read_namespace(cls.__ns_spec_path):
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/hdmf/backends/hdf5/h5_utils.py", line 402, in read_namespace
    self.__cache = self.__read(ns_path)
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/hdmf/backends/hdf5/h5_utils.py", line 387, in __read
    s = self.__group[path][()]
  File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
  File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
  File "/home/kyu/miniconda3/envs/spyglass/lib/python3.8/site-packages/h5py/_hl/group.py", line 264, in __getitem__
    oid = h5o.open(self.id, self._e(name), lapl=self._lapl)
  File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
  File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
  File "h5py/h5o.pyx", line 190, in h5py.h5o.open
KeyError: 'Unable to open object (bad object header version number)'
---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
Input In [5], in <cell line: 7>()
      3     shutil.rmtree(recording_path)
      5 start = time.time()
----> 7 recording.save(folder=recording_path,
      8                            n_jobs=4,chunk_duration='1s')
      9 end = time.time()
     10 print(end - start)

File ~/repos/spikeinterface/spikeinterface/core/base.py:615, in BaseExtractor.save(self, **kwargs)
    613     loaded_extractor = self.save_to_zarr(**kwargs)
    614 else:
--> 615     loaded_extractor = self.save_to_folder(**kwargs)
    616 return loaded_extractor

File ~/repos/spikeinterface/spikeinterface/core/base.py:694, in BaseExtractor.save_to_folder(self, name, folder, verbose, **save_kwargs)
    691 self.save_metadata_to_folder(folder)
    693 # save data (done the subclass)
--> 694 cached = self._save(folder=folder, verbose=verbose, **save_kwargs)
    696 # copy properties/
    697 self.copy_metadata(cached)

File ~/repos/spikeinterface/spikeinterface/core/baserecording.py:224, in BaseRecording._save(self, format, **save_kwargs)
    221     dtype = self.get_dtype()
    223 job_kwargs = {k: save_kwargs[k] for k in job_keys if k in save_kwargs}
--> 224 write_binary_recording(self, file_paths=file_paths, dtype=dtype, **job_kwargs)
    226 from .binaryrecordingextractor import BinaryRecordingExtractor
    227 cached = BinaryRecordingExtractor(file_paths=file_paths, sampling_frequency=self.get_sampling_frequency(),
    228                                   num_chan=self.get_num_channels(), dtype=dtype,
    229                                   t_starts=t_starts, channel_ids=self.get_channel_ids(), time_axis=0,
    230                                   file_offset=0, gain_to_uV=self.get_channel_gains(),
    231                                   offset_to_uV=self.get_channel_offsets())

File ~/repos/spikeinterface/spikeinterface/core/core_tools.py:285, in write_binary_recording(recording, file_paths, dtype, add_file_extension, verbose, byte_offset, auto_cast_uint, **job_kwargs)
    282     init_args = (recording.to_dict(), rec_memmaps_dict, dtype, cast_unsigned)
    283 executor = ChunkRecordingExecutor(recording, func, init_func, init_args, verbose=verbose,
    284                                   job_name='write_binary_recording', **job_kwargs)
--> 285 executor.run()

File ~/repos/spikeinterface/spikeinterface/core/job_tools.py:312, in ChunkRecordingExecutor.run(self)
    310                 returns.append(res)
    311         else:
--> 312             for res in results:
    313                 pass
    315 return returns

File ~/miniconda3/envs/spyglass/lib/python3.8/concurrent/futures/process.py:484, in _chain_from_iterable_of_lists(iterable)
    478 def _chain_from_iterable_of_lists(iterable):
    479     """
    480     Specialized implementation of itertools.chain.from_iterable.
    481     Each item in *iterable* should be a list.  This function is
    482     careful not to keep references to yielded objects.
    483     """
--> 484     for element in iterable:
    485         element.reverse()
    486         while element:

File ~/miniconda3/envs/spyglass/lib/python3.8/concurrent/futures/_base.py:619, in Executor.map.<locals>.result_iterator()
    616 while fs:
    617     # Careful not to keep a reference to the popped future
    618     if timeout is None:
--> 619         yield fs.pop().result()
    620     else:
    621         yield fs.pop().result(end_time - time.monotonic())

File ~/miniconda3/envs/spyglass/lib/python3.8/concurrent/futures/_base.py:444, in Future.result(self, timeout)
    442     raise CancelledError()
    443 elif self._state == FINISHED:
--> 444     return self.__get_result()
    445 else:
    446     raise TimeoutError()

File ~/miniconda3/envs/spyglass/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
khl02007 commented 2 years ago

I was able to resolve this issue by upgrading to h5py=3.7

samuelgarcia commented 2 years ago

Does it was working with n_jobs=1 ?