nipype / pydra

Pydra Dataflow Engine
https://nipype.github.io/pydra/
Other
122 stars 59 forks source link

Running workflow in pydra tutorial (on Binder) with "serial" plugin raises async error #498

Open tclose opened 3 years ago

tclose commented 3 years ago

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_155/1214088087.py in <module>
      1 with pydra.Submitter(plugin="serial") as sub:
----> 2     sub(wf1)
      3 
      4 wf1.result()

/srv/conda/envs/notebook/lib/python3.7/site-packages/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
     54                     nd.cache_dir = runnable.cache_dir
     55         if is_workflow(runnable) and runnable.state is None:
---> 56             self.loop.run_until_complete(self.submit_workflow(runnable, rerun=rerun))
     57         else:
     58             self.loop.run_until_complete(self.submit(runnable, wait=True, rerun=rerun))

/srv/conda/envs/notebook/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
     68                 raise RuntimeError(
     69                     'Event loop stopped before Future completed.')
---> 70             return f.result()
     71 
     72     def _run_once(self):

/srv/conda/envs/notebook/lib/python3.7/asyncio/futures.py in result(self)
    179         self.__log_traceback = False
    180         if self._exception is not None:
--> 181             raise self._exception
    182         return self._result
    183 

/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    247                 # We use the `send` method directly, because coroutines
    248                 # don't have `__iter__` and `__next__` methods.
--> 249                 result = coro.send(None)
    250             else:
    251                 result = coro.throw(exc)

/srv/conda/envs/notebook/lib/python3.7/site-packages/pydra/engine/submitter.py in submit_workflow(self, workflow, rerun)
     69                 await self.worker.run_el(workflow, rerun=rerun)
     70             else:
---> 71                 await workflow._run(self, rerun=rerun)
     72         else:  # could be a tuple with paths to pickle files wiith tasks and inputs
     73             ind, wf_main_pkl, wf_orig = workflow

/srv/conda/envs/notebook/lib/python3.7/site-packages/pydra/engine/core.py in _run(self, submitter, rerun, **kwargs)
   1034             try:
   1035                 self.audit.monitor()
-> 1036                 await self._run_task(submitter, rerun=rerun)
   1037                 result.output = self._collect_outputs()
   1038             except Exception as e:

/srv/conda/envs/notebook/lib/python3.7/site-packages/pydra/engine/core.py in _run_task(self, submitter, rerun)
   1059             raise Exception("Submitter should already be set.")
   1060         # at this point Workflow is stateless so this should be fine
-> 1061         await submitter._run_workflow(self, rerun=rerun)
   1062 
   1063     def set_output(self, connections):

/srv/conda/envs/notebook/lib/python3.7/site-packages/pydra/engine/submitter.py in _run_workflow(self, wf, rerun)
    188                     for fut in await self.submit(task, rerun=rerun):
    189                         task_futures.add(fut)
--> 190             task_futures = await self.worker.fetch_finished(task_futures)
    191             tasks, follow_err = get_runnable_tasks(graph_copy)
    192             # updating tasks_errored

/srv/conda/envs/notebook/lib/python3.7/site-packages/pydra/engine/workers.py in fetch_finished(self, futures)
     50         try:
     51             done, pending = await asyncio.wait(
---> 52                 futures, return_when=asyncio.FIRST_COMPLETED
     53             )
     54         except ValueError:

/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py in wait(fs, loop, timeout, return_when)
    385         loop = events.get_event_loop()
    386 
--> 387     fs = {ensure_future(f, loop=loop) for f in set(fs)}
    388 
    389     return await _wait(fs, timeout, return_when, loop)

/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py in <setcomp>(.0)
    385         loop = events.get_event_loop()
    386 
--> 387     fs = {ensure_future(f, loop=loop) for f in set(fs)}
    388 
    389     return await _wait(fs, timeout, return_when, loop)

/srv/conda/envs/notebook/lib/python3.7/asyncio/tasks.py in ensure_future(coro_or_future, loop)
    617         return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    618     else:
--> 619         raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
    620                         'required')
    621 

TypeError: An asyncio.Future, a coroutine or an awaitable is required```
tclose commented 3 years ago

Python version: 3.9.5

djarecka commented 3 years ago

thanks for reporting, asyncio and jupyter interactions are sometimes tricky...

tclose commented 3 years ago

I just exported it to a normal script and got the same error so it isn't Jupyter specific. Does the serial option use asyncio?

tclose commented 3 years ago

Are you able to reproduce this error @djarecka?

I just tried running the exported workbook with the cf plugin and got a different error (one that I had thought was specific to my code, but now seems to be related to my package configuration)

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 125, in _main
    prepare(preparation_data)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/runpy.py", line 268, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/Users/tclose/git/workflows/pydra-tutorial/notebooks/exported_workflow.py", line 61, in <module>
    sub(wf1)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/submitter.py", line 56, in __call__
    self.loop.run_until_complete(self.submit_workflow(runnable, rerun=rerun))
  File "/usr/local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
    return f.result()
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/submitter.py", line 71, in submit_workflow
    await workflow._run(self, rerun=rerun)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/core.py", line 1042, in _run
    result.output = self._collect_outputs()
  File "/Users/tclose/git/workflows/pydra/pydra/engine/core.py", line 1129, in _collect_outputs
    val_out = val.get_value(self)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/specs.py", line 800, in get_value
    if result.errored:
AttributeError: 'NoneType' object has no attribute 'errored'
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /Users/tclose/git/workflows/pydra/pydra/engine/workers.py:161> exception=RuntimeError('\n        An attempt has been made to start a new process before the\n        current process has finished its bootstrapping phase.\n\n        This probably means that you are not using fork to start your\n        child processes and you have forgotten to use the proper idiom\n        in the main module:\n\n            if __name__ == \'__main__\':\n                freeze_support()\n                ...\n\n        The "freeze_support()" line can be omitted if the program\n        is not going to be frozen to produce an executable.')>
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/workers.py", line 164, in exec_as_coro
    res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/process.py", line 697, in submit
    self._adjust_process_count()
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/process.py", line 675, in _adjust_process_count
    p.start()
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /Users/tclose/git/workflows/pydra/pydra/engine/workers.py:161> exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')>
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/workers.py", line 164, in exec_as_coro
    res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Traceback (most recent call last):
  File "/Users/tclose/git/workflows/pydra-tutorial/notebooks/./exported_workflow.py", line 61, in <module>
    sub(wf1)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/submitter.py", line 56, in __call__
    self.loop.run_until_complete(self.submit_workflow(runnable, rerun=rerun))
  File "/usr/local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
    return f.result()
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/usr/local/Cellar/python@3.9/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/submitter.py", line 71, in submit_workflow
    await workflow._run(self, rerun=rerun)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/core.py", line 1042, in _run
    result.output = self._collect_outputs()
  File "/Users/tclose/git/workflows/pydra/pydra/engine/core.py", line 1129, in _collect_outputs
    val_out = val.get_value(self)
  File "/Users/tclose/git/workflows/pydra/pydra/engine/specs.py", line 800, in get_value
    if result.errored:
AttributeError: 'NoneType' object has no attribute 'errored'

My Python 3.9.5 environment is as follows

alabaster==0.7.12
appdirs==1.4.4
attrs==21.2.0
Babel==2.9.1
backports.entry-points-selectable==1.1.0
black==21.4b2
boutiques==0.5.25
cachetools==4.2.4
certifi==2021.5.30
cfgv==3.3.1
charset-normalizer==2.0.6
ci-info==0.2.0
click==8.0.1
cloudpickle==2.0.0
codecov==2.1.12
colorclass==2.2.0
coverage==6.0
distlib==0.3.3
docopt==0.6.2
docutils==0.17.1
etelemetry==0.2.2
execnet==1.9.0
filelock==3.3.0
frozendict==2.0.6
identify==2.3.0
idna==3.2
imagesize==1.2.0
Jinja2==3.0.1
jsonschema==4.0.1
lxml==4.6.3
MarkupSafe==2.0.1
mock==4.0.3
more-itertools==8.10.0
mypy-extensions==0.4.3
nest-asyncio==1.5.1
nexus-sdk==0.3.2
nodeenv==1.6.0
numpy==1.21.2
packaging==21.0
pathspec==0.9.0
pbr==5.6.0
platformdirs==2.4.0
pluggy==0.13.1
pockets==0.9.1
pre-commit==2.15.0
psutil==5.8.0
puremagic==1.11
py==1.10.0
pydra==0.16.2
Pygments==2.10.0
PyLD==2.0.3
Pympler==0.9
pyparsing==2.4.7
pyrsistent==0.18.0
pytest==5.4.3
pytest-cov==3.0.0
pytest-env==0.6.2
pytest-forked==1.3.0
pytest-rerunfailures==10.2
pytest-timeout==1.4.2
pytest-xdist==1.34.0
python-dateutil==2.8.2
pytz==2021.3
PyYAML==5.4.1
regex==2021.9.30
requests==2.26.0
simplejson==3.17.5
six==1.16.0
snowballstemmer==2.1.0
Sphinx==4.2.0
sphinx-rtd-theme==1.0.0
sphinxcontrib-apidoc==0.3.0
sphinxcontrib-applehelp==1.0.2
sphinxcontrib-devhelp==1.0.2
sphinxcontrib-htmlhelp==2.0.0
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-napoleon==0.7
sphinxcontrib-qthelp==1.0.3
sphinxcontrib-serializinghtml==1.1.5
sphinxcontrib-versioning==2.2.1
sseclient==0.0.27
tabulate==0.8.9
termcolor==1.1.0
toml==0.10.2
tomli==1.2.1
tornado==6.1
urllib3==1.26.7
virtualenv==20.8.1
wcwidth==0.2.5
tclose commented 3 years ago

After a bit more investigation it seems that the following method of the Worker base class

https://github.com/nipype/pydra/blob/eb70454de653c89d679a5396d6f8bce8258473a4/pydra/engine/workers.py#L35-L59

should be overridden in the SerialWorker class. Can it just return an empty list?