nubs01 / blechpy

Python package for extraction, spike clustering and processing of Intan recorded neural data for Katz Lab
MIT License
11 stars 5 forks source link

Parallel process runtime error on dat.detect_spikes() #14

Closed danielsvedberg closed 3 years ago

danielsvedberg commented 3 years ago

Process hangs after starting and never makes progress

In [9]: dat.detect_spikes()                                                     
Running Spike Detection...
  0%|                                                    | 0/64 [00:00<?, ?it/s]\

^CProcess SpawnPoolWorker-72:
Process SpawnPoolWorker-69:
Process SpawnPoolWorker-73:
Process SpawnPoolWorker-71:
Process SpawnPoolWorker-68:
Process SpawnPoolWorker-75:
Process SpawnPoolWorker-67:
Process SpawnPoolWorker-66:
Process SpawnPoolWorker-74:
Process SpawnPoolWorker-70:
Process SpawnPoolWorker-65:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
KeyboardInterrupt
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
KeyboardInterrupt
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
KeyboardInterrupt
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
KeyboardInterrupt
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3331, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-9-12a7080dd97a>", line 1, in <module>
    dat.detect_spikes()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/utils/decorators.py", line 18, in wrapper
    func(*args, **kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/datastructures/dataset.py", line 778, in detect_spikes
    pool.join()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py", line 556, in join
    self._worker_handler.join()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/threading.py", line 1044, in join
    self._wait_for_tstate_lock()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/threading.py", line 1060, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3254, in run_ast_nodes
    if (await self.run_code(code, result,  async_=asy)):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3348, in run_code
    self.showtraceback(running_compiled_code=True)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2049, in showtraceback
    self._showtraceback(etype, value, stb)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2067, in _showtraceback
    print(self.InteractiveTB.stb2text(stb))
ValueError: I/O operation on closed file.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2886, in _run_cell
    return runner(coro)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/async_helpers.py", line 68, in _pseudo_sync_runner
    coro.send(None)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3063, in run_cell_async
    interactivity=interactivity, compiler=compiler, result=result)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3273, in run_ast_nodes
    self.showtraceback()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2049, in showtraceback
    self._showtraceback(etype, value, stb)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2067, in _showtraceback
    print(self.InteractiveTB.stb2text(stb))
ValueError: I/O operation on closed file.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/terminal/interactiveshell.py", line 558, in mainloop
    self.interact()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/terminal/interactiveshell.py", line 549, in interact
    self.run_cell(code, store_history=True)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2858, in run_cell
    raw_cell, store_history, silent, shell_futures)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2891, in _run_cell
    self.showtraceback(running_compiled_code=True)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2049, in showtraceback
    self._showtraceback(etype, value, stb)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 2067, in _showtraceback
    print(self.InteractiveTB.stb2text(stb))
ValueError: I/O operation on closed file.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/bin/ipython", line 8, in <module>
    sys.exit(start_ipython())
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/__init__.py", line 126, in start_ipython
    return launch_new_instance(argv=argv, **kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/traitlets/config/application.py", line 664, in launch_instance
    app.start()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/terminal/ipapp.py", line 356, in start
    self.shell.mainloop()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/terminal/interactiveshell.py", line 570, in mainloop
    self.restore_term_title()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/terminal/interactiveshell.py", line 271, in restore_term_title
    restore_term_title()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/utils/terminal.py", line 119, in restore_term_title
    _restore_term_title()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/utils/terminal.py", line 73, in _restore_term_title_xterm
    sys.stdout.write('\033[23;0t')
ValueError: I/O operation on closed file.

If you suspect this is an IPython 7.13.0 bug, please report it at:
    https://github.com/ipython/ipython/issues
or send an email to the mailing list at ipython-dev@python.org

You can print a more detailed traceback right now with "%tb", or use "%debug"
to interactively debug it.

Extra-detailed tracebacks for bug-reporting purposes can be enabled via:
    %config Application.verbose_crash=True
nubs01 commented 3 years ago

Sorry it took me so long to get back to this. Can you run it with "multi_process = False" so we can get a look where it hangs. It's impossible to debug with this output. Also are you running this locally or on jetstream?

danielsvedberg commented 3 years ago

Sorry it took me so long to get back to this. Can you run it with "multi_process = False" so we can get a look where it hangs. It's impossible to debug with this output. Also are you running this locally or on jetstream?

I'm running locally on Ubuntu 20.04 on an environment copied from your .yml file.

dat.detect_spikes(multi_process=False) does not throw any error.

nubs01 commented 3 years ago

No error, but does it hang with multiprocess=False, or does it run through just fine? If the later I'm guessing it's the multiprocess package that doesn't play nice with Ubuntu 20. I'll see if I can switch it to use joblib.

On Tue, Aug 3, 2021, 3:26 PM Daniel Svedberg @.***> wrote:

Sorry it took me so long to get back to this. Can you run it with "multi_process = False" so we can get a look where it hangs. It's impossible to debug with this output. Also are you running this locally or on jetstream?

I'm running locally on Ubuntu 20.04 on an environment copied from your .yml file.

dat.detect_spikes(multi_process=False) does not throw any error.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/nubs01/blechpy/issues/14#issuecomment-892104787, or unsubscribe https://github.com/notifications/unsubscribe-auth/AD4ZIKXAX7IYJP6EN4NOFZTT3A7IDANCNFSM5AWDJY2A . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email .

danielsvedberg commented 3 years ago

I turned on verbose crash reporting for ipython and re-ran detect_spikes under the same conditions that produce the hanging bug. I then ctrl+c'd the process, and of the output, I found the following potentially informative:

KeyboardInterruptPython 3.7.6: /home/dsvedberg/anaconda3/envs/blechpy/bin/python
                                                   Tue Aug  3 16:08:49 2021
A problem occurred executing Python code.  Here is the sequence of function
calls leading up to the error, with the most recent (innermost) call last.
~/anaconda3/envs/blechpy/lib/python3.7/site-packages/IPython/core/interactiveshell.py in run_code(self=<IPython.terminal.interactiveshell.TerminalInteractiveShell object>, code_obj=<code object <module> at 0x7f7373c68c90, file "<ipython-input-8-12a7080dd97a>", line 1>, result=<ExecutionResult object at 7f737445afd0, executi...rue silent=False shell_futures=True> result=None>, async_=False)
   3330                 else:
-> 3331                     exec(code_obj, self.user_global_ns, self.user_ns)
        global exec = undefined
        code_obj = <code object <module> at 0x7f7373c68c90, file "<ipython-input-8-12a7080dd97a>", line 1>
        self.user_global_ns = {'__name__': '__main__', '__doc__': 'Automatically created module for IPython interactive environment', '__package__': None, '__loader__': None, '__spec__': None, '__builtin__': <module 'builtins' (built-in)>, '__builtins__': <module 'builtins' (built-in)>, '_ih': ['', "get_ipython().run_line_magic('condif', 'Application.verbose_crash=True')", "get_ipython().run_line_magic('cofig', ' Application.verbose_crash=True')", "get_ipython().run_line_magic('config', 'Application.verbose_crash=True')", 'import datashader', 'import blechpy', "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", 'dat = blechpy.load_dataset(rec_dir)', 'dat.detect_spikes()'], '_oh': {}, '_dh': ['/home/dsvedberg'], 'In': ['', "get_ipython().run_line_magic('condif', 'Application.verbose_crash=True')", "get_ipython().run_line_magic('cofig', ' Application.verbose_crash=True')", "get_ipython().run_line_magic('config', 'Application.verbose_crash=True')", 'import datashader', 'import blechpy', "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", 'dat = blechpy.load_dataset(rec_dir)', 'dat.detect_spikes()'], 'Out': {}, 'get_ipython': <bound method InteractiveShell.get_ipython of <IPython.terminal.interactiveshell.TerminalInteractiveShell object at 0x7f73db9dc250>>, 'exit': <IPython.core.autocall.ExitAutocall object at 0x7f73d9c88190>, 'quit': <IPython.core.autocall.ExitAutocall object at 0x7f73d9c88190>, '_': '', '__': '', '___': '', '_i': 'dat = blechpy.load_dataset(rec_dir)', '_ii': "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", '_iii': 'import blechpy', '_i1': '%condif Application.verbose_crash=True', '_i2': '%cofig  Application.verbose_crash=True', '_i3': '%config Application.verbose_crash=True', '_i4': 'import datashader', 'datashader': <module 'datashader' from '/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/datashader/__init__.py'>, '_i5': 'import blechpy', 'blechpy': <module 'blechpy' from '/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/__init__.py'>, '_i6': "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", 'rec_dir': '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504', '_i7': 'dat = blechpy.load_dataset(rec_dir)', 'dat': <blechpy.datastructures.dataset.dataset object at 0x7f73d9128c50>, '_i8': 'dat.detect_spikes()'}
        self.user_ns = {'__name__': '__main__', '__doc__': 'Automatically created module for IPython interactive environment', '__package__': None, '__loader__': None, '__spec__': None, '__builtin__': <module 'builtins' (built-in)>, '__builtins__': <module 'builtins' (built-in)>, '_ih': ['', "get_ipython().run_line_magic('condif', 'Application.verbose_crash=True')", "get_ipython().run_line_magic('cofig', ' Application.verbose_crash=True')", "get_ipython().run_line_magic('config', 'Application.verbose_crash=True')", 'import datashader', 'import blechpy', "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", 'dat = blechpy.load_dataset(rec_dir)', 'dat.detect_spikes()'], '_oh': {}, '_dh': ['/home/dsvedberg'], 'In': ['', "get_ipython().run_line_magic('condif', 'Application.verbose_crash=True')", "get_ipython().run_line_magic('cofig', ' Application.verbose_crash=True')", "get_ipython().run_line_magic('config', 'Application.verbose_crash=True')", 'import datashader', 'import blechpy', "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", 'dat = blechpy.load_dataset(rec_dir)', 'dat.detect_spikes()'], 'Out': {}, 'get_ipython': <bound method InteractiveShell.get_ipython of <IPython.terminal.interactiveshell.TerminalInteractiveShell object at 0x7f73db9dc250>>, 'exit': <IPython.core.autocall.ExitAutocall object at 0x7f73d9c88190>, 'quit': <IPython.core.autocall.ExitAutocall object at 0x7f73d9c88190>, '_': '', '__': '', '___': '', '_i': 'dat = blechpy.load_dataset(rec_dir)', '_ii': "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", '_iii': 'import blechpy', '_i1': '%condif Application.verbose_crash=True', '_i2': '%cofig  Application.verbose_crash=True', '_i3': '%config Application.verbose_crash=True', '_i4': 'import datashader', 'datashader': <module 'datashader' from '/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/datashader/__init__.py'>, '_i5': 'import blechpy', 'blechpy': <module 'blechpy' from '/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/__init__.py'>, '_i6': "rec_dir = '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504'", 'rec_dir': '/media/dsvedberg/Ubuntu Disk/taste_experience/DS33/DS33_spont_4tastes_200305_153504', '_i7': 'dat = blechpy.load_dataset(rec_dir)', 'dat': <blechpy.datastructures.dataset.dataset object at 0x7f73d9128c50>, '_i8': 'dat.detect_spikes()'}
   3332             finally:

<ipython-input-8-12a7080dd97a> in <module>
----> 1 dat.detect_spikes()
        global dat.detect_spikes = <bound method dataset.detect_spikes of <blechpy.datastructures.dataset.dataset object at 0x7f73d9128c50>>

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/utils/decorators.py in wrapper(*args=(<blechpy.datastructures.dataset.dataset object>,), **kwargs={})
     17                     try:
---> 18                         func(*args, **kwargs)
        global func = undefined
        args = (<blechpy.datastructures.dataset.dataset object at 0x7f73d9128c50>,)
        kwargs = {}
     19                         fail = False

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/datastructures/dataset.py in detect_spikes(self=<blechpy.datastructures.dataset.dataset object>, data_quality=None, multi_process=True, n_cores=11)
    777             pool.close()
--> 778             pool.join()
        pool.join = <bound method Pool.join of <multiprocessing.pool.Pool object at 0x7f7391c9ccd0>>
    779         else:

~/anaconda3/envs/blechpy/lib/python3.7/multiprocessing/pool.py in join(self=<multiprocessing.pool.Pool object>)
    555             raise ValueError("In unknown state")
--> 556         self._worker_handler.join()
        self._worker_handler.join = <bound method Thread.join of <Thread(Thread-108, started daemon 140134191494912)>>
    557         self._task_handler.join()

~/anaconda3/envs/blechpy/lib/python3.7/threading.py in join(self=<Thread(Thread-108, started daemon 140134191494912)>, timeout=None)
   1043         if timeout is None:
-> 1044             self._wait_for_tstate_lock()
        self._wait_for_tstate_lock = <bound method Thread._wait_for_tstate_lock of <Thread(Thread-108, started daemon 140134191494912)>>
   1045         else:

~/anaconda3/envs/blechpy/lib/python3.7/threading.py in _wait_for_tstate_lock(self=<Thread(Thread-108, started daemon 140134191494912)>, block=True, timeout=-1)
   1059             assert self._is_stopped
-> 1060         elif lock.acquire(block, timeout):
        lock.acquire = <built-in method acquire of _thread.lock object at 0x7f73743815d0>
        block = True
        timeout = -1
   1061             lock.release()

Looks like the process is hanging at pool.join() on line 778. I'll do a quick check to see if there's any obvious reason why the individual processes might not be closing out, but Roshan, if you think there's an obvious cause, let me know.

danielsvedberg commented 3 years ago

No error, but does it hang with multiprocess=False, or does it run through just fine? If the later I'm guessing it's the multiprocess package that doesn't play nice with Ubuntu 20. I'll see if I can switch it to use joblib.

function runs through fine when multi_process=False

nubs01 commented 3 years ago

Yeah this issue is definitely with multiprocess, so I'll change it to joblib hopefully tonight and let you know.

On Tue, Aug 3, 2021, 4:17 PM Daniel Svedberg @.***> wrote:

No error, but does it hang with multiprocess=False, or does it run through just fine? If the later I'm guessing it's the multiprocess package that doesn't play nice with Ubuntu 20. I'll see if I can switch it to use joblib. … <#m5037082807227993299> On Tue, Aug 3, 2021, 3:26 PM Daniel Svedberg @.***> wrote: Sorry it took me so long to get back to this. Can you run it with "multi_process = False" so we can get a look where it hangs. It's impossible to debug with this output. Also are you running this locally or on jetstream? I'm running locally on Ubuntu 20.04 on an environment copied from your .yml file. dat.detect_spikes(multi_process=False) does not throw any error. — You are receiving this because you commented. Reply to this email directly, view it on GitHub <#14 (comment) https://github.com/nubs01/blechpy/issues/14#issuecomment-892104787>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AD4ZIKXAX7IYJP6EN4NOFZTT3A7IDANCNFSM5AWDJY2A . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email .

function runs through fine when multi_process=False

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/nubs01/blechpy/issues/14#issuecomment-892136894, or unsubscribe https://github.com/notifications/unsubscribe-auth/AD4ZIKVWU43TXZL7GLJIR4LT3BFGXANCNFSM5AWDJY2A . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email .

nubs01 commented 3 years ago

Just pushed a possible fix to this (2.0.73). Please test and let me know.

danielsvedberg commented 3 years ago

Ok, hear me out. So it's about this program called blechpy. It's a program that turns our data into a pickle. Funniest shit I've ever seen.

Looks like we're in a pickle

Runs just fine when multi_process=False

In [7]: dat.detect_spikes()                                                     
Running Spike Detection...
  0%|                                                    | 0/48 [00:00<?, ?it/s]

Exception in blechpy.datastructures.dataset.detect_spikes

---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/backend/queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 247, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 240, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/cloudpickle/cloudpickle.py", line 482, in dump
    return Pickler.dump(self, obj)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 890, in _batch_setitems
    save(v)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 846, in _batch_appends
    save(tmp[0])
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/cloudpickle/cloudpickle.py", line 556, in save_function
    return self.save_function_tuple(obj)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/cloudpickle/cloudpickle.py", line 758, in save_function_tuple
    save(state)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 843, in _batch_appends
    save(x)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
TypeError: cannot serialize '_io.TextIOWrapper' object
"""

The above exception was the direct cause of the following exception:

PicklingError                             Traceback (most recent call last)
<ipython-input-7-12a7080dd97a> in <module>
----> 1 dat.detect_spikes()

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/utils/decorators.py in wrapper(*args, **kwargs)
     24                     sys.stdout = old_out
     25                     if fail is not False:
---> 26                         raise fail
     27 
     28             else:

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/utils/decorators.py in wrapper(*args, **kwargs)
     16                     sys.stdout = f
     17                     try:
---> 18                         func(*args, **kwargs)
     19                         fail = False
     20                     except Exception as e:

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/datastructures/dataset.py in detect_spikes(self, data_quality, multi_process, n_cores)
    777 
    778             results = Parallel(n_jobs=n_cores)(delayed(run)(sd, update_pbar)
--> 779                                                for sd in spike_detectors)
    780         else:
    781             for sd in spike_detectors:

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in __call__(self, iterable)
   1015 
   1016             with self._backend.retrieval_context():
-> 1017                 self.retrieve()
   1018             # Make sure that we get a last message telling us we are done
   1019             elapsed_time = time.time() - self._start_time

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in retrieve(self)
    907             try:
    908                 if getattr(self._backend, 'supports_timeout', False):
--> 909                     self._output.extend(job.get(timeout=self.timeout))
    910                 else:
    911                     self._output.extend(job.get())

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    560         AsyncResults.get from multiprocessing."""
    561         try:
--> 562             return future.result(timeout=timeout)
    563         except LokyTimeoutError:
    564             raise TimeoutError()

~/anaconda3/envs/blechpy/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

~/anaconda3/envs/blechpy/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

PicklingError: Could not pickle the task to send it to the workers.
nubs01 commented 3 years ago

Yup, forgot that joblib requires not using functions defined inside functions. Should be fixed in 2.0.74. Give it a try.

danielsvedberg commented 3 years ago

New error:

In [5]: dat.detect_spikes()                                                                                                                       
Running Spike Detection...
exception calling callback for <Future at 0x7f67ca39c110 state=finished raised TerminatedWorkerError>
Traceback (most recent call last):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
    callback(self)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py", line 340, in __call__
    self.parallel.dispatch_next()
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py", line 769, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py", line 835, in dispatch_one_batch
    self._dispatch(tasks)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py", line 754, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 551, in apply_async
    future = self._workers.submit(SafeFunction(func))
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/reusable_executor.py", line 160, in submit
    fn, *args, **kwargs)
  File "/home/dsvedberg/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/process_executor.py", line 1027, in submit
    raise self._flags.broken
joblib.externals.loky.process_executor.TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker. The exit codes of the workers are {SIGSEGV(-11)}

Exception in blechpy.datastructures.dataset.detect_spikes

---------------------------------------------------------------------------
TerminatedWorkerError                     Traceback (most recent call last)
<ipython-input-5-12a7080dd97a> in <module>
----> 1 dat.detect_spikes()

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/utils/decorators.py in wrapper(*args, **kwargs)
     24                     sys.stdout = old_out
     25                     if fail is not False:
---> 26                         raise fail
     27 
     28             else:

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/utils/decorators.py in wrapper(*args, **kwargs)
     16                     sys.stdout = f
     17                     try:
---> 18                         func(*args, **kwargs)
     19                         fail = False
     20                     except Exception as e:

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/blechpy/datastructures/dataset.py in detect_spikes(self, data_quality, multi_process, n_cores)
    764 
    765             results = Parallel(n_jobs=n_cores)(delayed(run_joblib_process)(sd)
--> 766                                                for sd in spike_detectors)
    767             results = zip(*results)
    768         else:

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in __call__(self, iterable)
   1015 
   1016             with self._backend.retrieval_context():
-> 1017                 self.retrieve()
   1018             # Make sure that we get a last message telling us we are done
   1019             elapsed_time = time.time() - self._start_time

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in retrieve(self)
    907             try:
    908                 if getattr(self._backend, 'supports_timeout', False):
--> 909                     self._output.extend(job.get(timeout=self.timeout))
    910                 else:
    911                     self._output.extend(job.get())

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    560         AsyncResults.get from multiprocessing."""
    561         try:
--> 562             return future.result(timeout=timeout)
    563         except LokyTimeoutError:
    564             raise TimeoutError()

~/anaconda3/envs/blechpy/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

~/anaconda3/envs/blechpy/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/_base.py in _invoke_callbacks(self)
    623         for callback in self._done_callbacks:
    624             try:
--> 625                 callback(self)
    626             except BaseException:
    627                 LOGGER.exception('exception calling callback for %r', self)

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in __call__(self, out)
    338         with self.parallel._lock:
    339             if self.parallel._original_iterator is not None:
--> 340                 self.parallel.dispatch_next()
    341 
    342 

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in dispatch_next(self)
    767 
    768         """
--> 769         if not self.dispatch_one_batch(self._original_iterator):
    770             self._iterating = False
    771             self._original_iterator = None

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in dispatch_one_batch(self, iterator)
    833                 return False
    834             else:
--> 835                 self._dispatch(tasks)
    836                 return True
    837 

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/parallel.py in _dispatch(self, batch)
    752         with self._lock:
    753             job_idx = len(self._jobs)
--> 754             job = self._backend.apply_async(batch, callback=cb)
    755             # A job can complete so quickly than its callback is
    756             # called before we get here, causing self._jobs to

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/_parallel_backends.py in apply_async(self, func, callback)
    549     def apply_async(self, func, callback=None):
    550         """Schedule a func to be run"""
--> 551         future = self._workers.submit(SafeFunction(func))
    552         future.get = functools.partial(self.wrap_future_result, future)
    553         if callback is not None:

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/reusable_executor.py in submit(self, fn, *args, **kwargs)
    158         with self._submit_resize_lock:
    159             return super(_ReusablePoolExecutor, self).submit(
--> 160                 fn, *args, **kwargs)
    161 
    162     def _resize(self, max_workers):

~/anaconda3/envs/blechpy/lib/python3.7/site-packages/joblib/externals/loky/process_executor.py in submit(self, fn, *args, **kwargs)
   1025         with self._flags.shutdown_lock:
   1026             if self._flags.broken is not None:
-> 1027                 raise self._flags.broken
   1028             if self._flags.shutdown:
   1029                 raise ShutdownExecutorError(

TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker. The exit codes of the workers are {SIGSEGV(-11)}
danielsvedberg commented 3 years ago

fixed with blechpy update 2.1.0