Closed Jacopo-Baldi closed 4 months ago
Hi Jacopo. The job_kwargs logic is mainly use internally by spiekinterface for rec.save(..., job_kwargs) or detect_peak(..., job_kwargs) all external sorters have there own way to make the parallel computing. I am not sure how this can be handle for montainsort. @magland any idea ?
Hi Jacopo. The job_kwargs logic is mainly use internally by spiekinterface for rec.save(..., job_kwargs) or detect_peak(..., job_kwargs) all external sorters have there own way to make the parallel computing. I am not sure how this can be handle for montainsort. @magland any idea ?
Hi Sam, Thank you for your reply! I thought that the parameter num_workers in the method of mountainsort (the second thing I tried) was specifically supposed to exploit the parallel computing, is it not?
@samuelgarcia Do you think that it would be crazy to try to use ipyparallel to run mountainsort4 in parallel one core for each channel (which can be sorted independently between one another) changing the function run_mountainsort4 so that it does not save the data always in the same folder (which would overwrite the results when parallelized)?
@Jacopo-Baldi we already have a mechanism to parallelize over groups! Do you want to run each channel in parallel?
In that case you can do the following (I assume you have 60 cores in your machine):
import spikeinterface.sorters as ss
# set channel groups
recording.set_channel_groups(np.arange(recording.get_num_channels()))
sorting_by_group = ss.run_sorter_by_property("mountainsort4", recording, grouping_property="group", working_folder="ms4", engine="joblib", engine_kwargs={"n_jobs": 60})
The sorting_by_group
is a single sorting object, but each unit will have a group
property with the channel group on which it was found! Let me know if it works!
@magland Thank you for the suggestion, I did not know that that function could be used like that! I tried just now and after working for a while (+30') it stopped with this error. I tried running the sorting on only 5 channels to see if it works before launching it for the rest. With that method, would it be possible to use the rest of the cores as well? Is there a function that simply makes use of all the cores available (as if I put n_jobs=-1 for the detect_peaks) regardless of the number of channels?
sorting_by_group = ss.run_sorter_by_property("mountainsort4", myRec, grouping_property="group", working_folder="ms4", engine="joblib", engine_kwargs={"n_jobs": 64})
Mountainsort4 use the OLD spikeextractors mapped with NewToOldRecording
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEAutility/core.py:23: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEAutility/core.py:23: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/tools.py:19: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/tools.py:19: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/simulate_cells.py:19: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/simulate_cells.py:19: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/generators/templategenerator.py:13: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/generators/templategenerator.py:13: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/generators/recordinggenerator.py:27: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/generators/recordinggenerator.py:27: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/generation_tools.py:12: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/MEArec/generation_tools.py:12: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(yaml.version) >= StrictVersion('5.0.0'): /home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/spikeextractors/extractors/mearecextractors/mearecextractors.py:13: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead. if StrictVersion(mr.version) >= '1.5.0': Traceback (most recent call last): File "
", line 1, in File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main exitcode = _main(fd, parent_sentinel) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/spawn.py", line 125, in _main prepare(preparation_data) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/spawn.py", line 231, in prepare set_start_method(data['start_method'], force=True) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/context.py", line 247, in set_start_method self._actual_context = self.get_context(method) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/context.py", line 239, in get_context return super().get_context(method) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/context.py", line 193, in get_context raise ValueError('cannot find context for %r' % method) from None ValueError: cannot find context for 'loky'
_RemoteTraceback Traceback (most recent call last) _RemoteTraceback: """ Traceback (most recent call last): File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 436, in _process_worker r = call_item() File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 288, in call return self.fn(*self.args, self.kwargs) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/_parallel_backends.py", line 595, in call return self.func(*args, *kwargs) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py", line 262, in call return [func(args, kwargs) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py", line 262, in
return [func(*args, kwargs) File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/launcher.py", line 38, in _run_one run_sorter(sorter_name, recording, output_folder=output_folder, File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/runsorter.py", line 143, in run_sorter return run_sorter_local(common_kwargs) File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/runsorter.py", line 165, in run_sorter_local sorting = SorterClass.get_result_from_folder(output_folder) File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/basesorter.py", line 280, in get_result_from_folder raise SpikeSortingError( spikeinterface.sorters.utils.misc.SpikeSortingError: Spike sorting failed. You can inspect the runtime trace in spikeinterface_log.json """ The above exception was the direct cause of the following exception:
SpikeSortingError Traceback (most recent call last) /tmp/ipykernel_23353/102306805.py in <cell line: 1>() ----> 1 sorting_by_group = ss.run_sorter_by_property("mountainsort4", myRec, grouping_property="group", working_folder="ms4", engine="joblib", engine_kwargs={"n_jobs": 64})
~/.local/lib/python3.8/site-packages/spikeinterface/sorters/launcher.py in run_sorter_by_property(sorter_name, recording, grouping_property, working_folder, mode_if_folder_exists, engine, engine_kwargs, verbose, docker_image, singularity_image, **sorter_params) 117 f"a recording property!" 118 recording_dict = recording.split_by(grouping_property) --> 119 sorting_output = run_sorters([sorter_name], recording_dict, working_folder, 120 mode_if_folder_exists=mode_if_folder_exists, 121 engine=engine,
~/.local/lib/python3.8/site-packages/spikeinterface/sorters/launcher.py in run_sorters(sorter_list, recording_dict_or_list, working_folder, sorter_params, mode_if_folder_exists, engine, engine_kwargs, verbose, with_output, docker_images, singularity_images) 273 n_jobs = engine_kwargs.get('n_jobs', -1) 274 backend = engine_kwargs.get('backend', 'loky') --> 275 Parallel(n_jobs=n_jobs, backend=backend)( 276 delayed(_run_one)(task_args) for task_args in task_args_list) 277
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py in call(self, iterable) 1054 1055 with self._backend.retrieval_context(): -> 1056 self.retrieve() 1057 # Make sure that we get a last message telling us we are done 1058 elapsed_time = time.time() - self._start_time
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py in retrieve(self) 933 try: 934 if getattr(self._backend, 'supports_timeout', False): --> 935 self._output.extend(job.get(timeout=self.timeout)) 936 else: 937 self._output.extend(job.get())
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout) 540 AsyncResults.get from multiprocessing.""" 541 try: --> 542 return future.result(timeout=timeout) 543 except CfTimeoutError as e: 544 raise TimeoutError from e
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/concurrent/futures/_base.py in result(self, timeout) 442 raise CancelledError() 443 elif self._state == FINISHED: --> 444 return self.__get_result() 445 else: 446 raise TimeoutError()
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/concurrent/futures/_base.py in __get_result(self) 387 if self._exception: 388 try: --> 389 raise self._exception 390 finally: 391 # Break a reference cycle with the exception in self._exception
SpikeSortingError: Spike sorting failed. You can inspect the runtime trace in spikeinterface_log.json
@Jacopo-Baldi I'm not @magland :P
Can you try to add the following: engine_kwargs={"n_jobs": 60, "backend": "multiprocessing"}
@alejoe91 Ops sorry for my oversight :D I tried the modifications you said, now there is this error instead.
RemoteTraceback Traceback (most recent call last) RemoteTraceback: """ Traceback (most recent call last): File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/pool.py", line 125, in worker result = (True, func(*args, kwds)) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/_parallel_backends.py", line 595, in call return self.func(*args, *kwargs) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py", line 262, in call return [func(args, kwargs) File "/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py", line 262, in
return [func(*args, kwargs) File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/launcher.py", line 38, in _run_one run_sorter(sorter_name, recording, output_folder=output_folder, File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/runsorter.py", line 143, in run_sorter return run_sorter_local(common_kwargs) File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/runsorter.py", line 165, in run_sorter_local sorting = SorterClass.get_result_from_folder(output_folder) File "/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/basesorter.py", line 280, in get_result_from_folder raise SpikeSortingError( spikeinterface.sorters.utils.misc.SpikeSortingError: Spike sorting failed. You can inspect the runtime trace in spikeinterface_log.json """ The above exception was the direct cause of the following exception:
SpikeSortingError Traceback (most recent call last) /tmp/ipykernel_23353/3784849406.py in <cell line: 1>() ----> 1 sorting_by_group = ss.run_sorter_by_property("mountainsort4", myRec, grouping_property="group", working_folder="ms4", engine="joblib", engine_kwargs={"n_jobs": 60,"backend":"multiprocessing"})
~/.local/lib/python3.8/site-packages/spikeinterface/sorters/launcher.py in run_sorter_by_property(sorter_name, recording, grouping_property, working_folder, mode_if_folder_exists, engine, engine_kwargs, verbose, docker_image, singularity_image, **sorter_params) 117 f"a recording property!" 118 recording_dict = recording.split_by(grouping_property) --> 119 sorting_output = run_sorters([sorter_name], recording_dict, working_folder, 120 mode_if_folder_exists=mode_if_folder_exists, 121 engine=engine,
~/.local/lib/python3.8/site-packages/spikeinterface/sorters/launcher.py in run_sorters(sorter_list, recording_dict_or_list, working_folder, sorter_params, mode_if_folder_exists, engine, engine_kwargs, verbose, with_output, docker_images, singularity_images) 273 n_jobs = engine_kwargs.get('n_jobs', -1) 274 backend = engine_kwargs.get('backend', 'loky') --> 275 Parallel(n_jobs=n_jobs, backend=backend)( 276 delayed(_run_one)(task_args) for task_args in task_args_list) 277
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py in call(self, iterable) 1054 1055 with self._backend.retrieval_context(): -> 1056 self.retrieve() 1057 # Make sure that we get a last message telling us we are done 1058 elapsed_time = time.time() - self._start_time
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/joblib/parallel.py in retrieve(self) 933 try: 934 if getattr(self._backend, 'supports_timeout', False): --> 935 self._output.extend(job.get(timeout=self.timeout)) 936 else: 937 self._output.extend(job.get())
/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/pool.py in get(self, timeout) 769 return self._value 770 else: --> 771 raise self._value 772 773 def _set(self, i, obj):
SpikeSortingError: Spike sorting failed. You can inspect the runtime trace in spikeinterface_log.json
While this is the spikeinterface_log.json
{ "sorter_name": "mountainsort4", "sorter_version": "unknown", "datetime": "2023-04-27T16:34:45.363503", "runtime_trace": [], "error": true, "error_trace": "Traceback (most recent call last):\n File \"/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/basesorter.py\", line 226, in run_from_folder\n SorterClass._run_from_folder(sorter_output_folder, sorter_params, verbose)\n File \"/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/external/mountainsort4.py\", line 133, in _run_from_folder\n old_api_sorting = mountainsort4.mountainsort4(ms4_params)\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/mountainsort4/mountainsort4.py\", line 38, in mountainsort4\n MS4.sort()\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/mountainsort4/ms4alg.py\", line 811, in sort\n prepare_timeseries_hdf5_from_recording(\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/mountainsort4/ms4alg.py\", line 678, in prepare_timeseries_hdf5_from_recording\n f.create_dataset('part-{}-{}'.format(m, j),\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/h5py/_hl/group.py\", line 161, in create_dataset\n dsid = dataset.make_new_dset(group, shape, dtype, data, name, kwds)\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/h5py/_hl/dataset.py\", line 159, in make_new_dset\n dset_id.write(h5s.ALL, h5s.ALL, data)\n File \"h5py/_objects.pyx\", line 54, in h5py._objects.with_phil.wrapper\n File \"h5py/_objects.pyx\", line 55, in h5py._objects.with_phil.wrapper\n File \"h5py/h5d.pyx\", line 232, in h5py.h5d.DatasetID.write\n File \"h5py/_proxy.pyx\", line 114, in h5py._proxy.dset_rw\nOSError: [Errno 28] Can't write data (file write failed: time = Thu Apr 27 16:35:44 2023\n, filename = '/tmp/tmpv8voyzwv/timeseries.hdf5', file descriptor = 73, errno = 28, error message = 'No space left on device', buf = 0x5557983d94b0, total write size = 4212160, bytes this sub-write = 4212160, bytes actually written = 18446744073709551615, offset = 0)\n", "run_time": null }
Which is quite weird because I have plenty of space on the device (multiple terabytes)
If with sorting by property it does not work, do you think it would be too difficult to modify the run_sorter scripts to parallelize them myself? Like changing the folder structure that they create so that it does not overwrite the results while multiple run_sorter run on different channels with ipyparallel
The problem is that ms4 uses the default /tmp
folder, which is quite small. You can provide an alternative folder with the tempdir
parameter. That should fix it!
Thank you, that fixed the memory issue! Now the function runs for a while (over 30min) and then crashes with this report in the log file. Do you know why it happens?
{ "sorter_name": "mountainsort4", "sorter_version": "unknown", "datetime": "2023-05-05T11:25:03.466603", "runtime_trace": [], "error": true, "error_trace": "Traceback (most recent call last):\n File \"/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/basesorter.py\", line 226, in run_from_folder\n SorterClass._run_from_folder(sorter_output_folder, sorter_params, verbose)\n File \"/home/jbaldi/.local/lib/python3.8/site-packages/spikeinterface/sorters/external/mountainsort4.py\", line 133, in _run_from_folder\n old_api_sorting = mountainsort4.mountainsort4(*ms4_params)\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/mountainsort4/mountainsort4.py\", line 38, in mountainsort4\n MS4.sort()\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/mountainsort4/ms4alg.py\", line 842, in sort\n dask.compute(dask_list, num_workers=self._num_workers)\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/dask/base.py\", line 571, in compute\n results = schedule(dsk, keys, **kwargs)\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/dask/multiprocessing.py\", line 219, in get\n result = get_async(\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/dask/local.py\", line 495, in get_async\n fire_tasks(chunksize)\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/site-packages/dask/local.py\", line 490, in fire_tasks\n fut = submit(batch_execute_tasks, each_args)\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/concurrent/futures/process.py\", line 645, in submit\n self._start_queue_management_thread()\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/concurrent/futures/process.py\", line 584, in _start_queue_management_thread\n self._adjust_process_count()\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/concurrent/futures/process.py\", line 608, in _adjust_process_count\n p.start()\n File \"/home/common/jbaldi/Scripts/spikeinterface/lib/python3.8/multiprocessing/process.py\", line 118, in start\n assert not _current_process._config.get('daemon'), \\nAssertionError: daemonic processes are not allowed to have children\n", "run_time": null }
I have not found any workaroud yet, @alejoe91 do you have any ideas? Is it something in the spikeinterface library that should be changed or an issue in the way we are using it?
I am still stuck on the same issue, is there anything I can do to fix it?
Hi @Jacopo-Baldi
Not sure..seems like something is wrong with multiprocessing. let me include @magland here, who is the author of mountainsort. Maybe he can help!
Hi @Jacopo-Baldi I'm not sure what you can do. My only suggestions (I know it's not very helpful) are to (a) Look through the source code of mountainsort4, or (b) have a look at mountainsort5. Note that with ms5, there is an open PR to incorporate the latest version into spikeinterface.
@Jacopo-Baldi still facing this issue?
I think the movement to MS5, couple to the lack of activity warrants closing this. @Jacopo-Baldi if this is still an issue please feel free to reopen or open a new issue.
Hi everyone, I am working with SpikeInterface to sort data and encountered this issue while trying to run the sorter in parallel. I think that I need to set the parameter n_jobs or num_workers to tell the function to run over multiple cores, but I tried multiple options, among which:
and
sorting_MNS = ss.run_mountainsort4(recording=myRec,num_workers = 60)
andsorter_params = dict(num_workers=60) sorting_MNS = ss.run_mountainsort4(recording=myRec,**sorter_params)
None of these seem to have any effect, and the sorting runs with just 1 core, but it does not give an error.
I also tried the next two but it gives the same error in both cases.
sorting_MNS = ss.run_mountainsort4(recording=myRec,**job_kwargs)
sorting_MNS = ss.run_sorter(sorter_name='mountainsort4',recording=myRec,**job_kwargs)
I tried different values of the number of cores, including -1 which should mean use all the available ones, but with no effect. I also tried using another sorter (tridesclous) and the problem seems to be the same. Is there something wrong with the way I am using the parameter for multiple core usage? I am sure that at a certain point in the past I had managed to do it, but since then I lost the script I had used, so the environment should be able to do it, and I tried using ipyparallel and it works so I know that the machine supports parallel computing. Any help would be greatly appreciated.