aiidateam / aiida-core

The official repository for the AiiDA code
https://aiida-core.readthedocs.io
Other
431 stars 186 forks source link

🐛 Engine: Submission failure with `DeliveryError` #6015

Open mbercx opened 1 year ago

mbercx commented 1 year ago

Just ran into this error while trying to submit:

---------------------------------------------------------------------------
DeliveryError                             Traceback (most recent call last)
Cell In[14], line 42
     39 builder.w90_chk_to_ukk_script = w90_script
     41 wc_group, _ = orm.Group.collection.get_or_create('tmp[/workchains](https://file+.vscode-resource.vscode-cdn.net/workchains)')
---> 42 wc_node = submit(builder); #wc_group.add_nodes(wc_node)

[...] # Trimmed for brevity, full traceback below

File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201), in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception
    202 return self._result

DeliveryError: (None, )
Full Traceback ```python --------------------------------------------------------------------------- DeliveryError Traceback (most recent call last) Cell In[14], line 42 39 builder.w90_chk_to_ukk_script = w90_script 41 wc_group, _ = orm.Group.collection.get_or_create('tmp[/workchains](https://file+.vscode-resource.vscode-cdn.net/workchains)') ---> 42 wc_node = submit(builder); #wc_group.add_nodes(wc_node) File [~/project/super/code/aiida-core/aiida/engine/launch.py:103](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/project/super/code/aiida-core/aiida/engine/launch.py:103), in submit(process, **inputs) 100 assert runner.persister is not None, 'runner does not have a persister' 101 assert runner.controller is not None, 'runner does not have a persister' --> 103 process_inited = instantiate_process(runner, process, **inputs) 105 # If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this 106 # instead of raising, because in this way the user does not have to change the launcher when testing. The same goes 107 # for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation. 108 if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs: File [~/project/super/code/aiida-core/aiida/engine/utils.py:64](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/project/super/code/aiida-core/aiida/engine/utils.py:64), in instantiate_process(runner, process, **inputs) 61 else: 62 raise ValueError(f'invalid process {type(process)}, needs to be Process or ProcessBuilder') ---> 64 process = process_class(runner=runner, inputs=inputs) 66 return process File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:194](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:194), in StateMachineMeta.__call__(cls, *args, **kwargs) 186 """ 187 Create the state machine and enter the initial state. 188 (...) 191 :return: An instance of the state machine 192 """ 193 inst = super().__call__(*args, **kwargs) --> 194 inst.transition_to(inst.create_initial_state()) 195 call_with_super_check(inst.init) 196 return inst File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:339](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:339), in StateMachine.transition_to(self, new_state, *args, **kwargs) 337 raise 338 self._transition_failing = True --> 339 self.transition_failed(initial_state_label, label, *sys.exc_info()[1:]) 340 finally: 341 self._transition_failing = False File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:1003](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:1003), in Process.transition_failed(self, initial_state, final_state, exception, trace) 998 def transition_failed( 999 self, initial_state: Hashable, final_state: Hashable, exception: Exception, trace: TracebackType 1000 ) -> None: 1001 # If we are creating, then reraise instead of failing. 1002 if final_state == process_states.ProcessState.CREATED: -> 1003 raise exception.with_traceback(trace) 1005 self.transition_to(process_states.ProcessState.EXCEPTED, exception, trace) File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:324](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:324), in StateMachine.transition_to(self, new_state, *args, **kwargs) 321 self._exit_current_state(new_state) 323 try: --> 324 self._enter_next_state(new_state) 325 except StateEntryFailed as exception: 326 # Make sure we have a state instance 327 new_state = self._create_state_instance(exception.state, *exception.args, **exception.kwargs) File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:388](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:388), in StateMachine._enter_next_state(self, next_state) 386 next_state.do_enter() 387 self._state = next_state --> 388 self._fire_state_event(StateEventHook.ENTERED_STATE, last_state) File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:300](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:300), in StateMachine._fire_state_event(self, hook, state) 298 def _fire_state_event(self, hook: Hashable, state: Optional[State]) -> None: 299 for callback in self._event_callbacks.get(hook, []): --> 300 callback(self, hook, state) File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:331](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:331), in Process._setup_event_hooks..(_s, _h, from_state) 325 def _setup_event_hooks(self) -> None: 326 """Set the event hooks to process, when it is created or loaded(recreated).""" 327 event_hooks = { 328 state_machine.StateEventHook.ENTERING_STATE: 329 lambda _s, _h, state: self.on_entering(cast(process_states.State, state)), 330 state_machine.StateEventHook.ENTERED_STATE: --> 331 lambda _s, _h, from_state: self.on_entered(cast(Optional[process_states.State], from_state)), 332 state_machine.StateEventHook.EXITING_STATE: 333 lambda _s, _h, _state: self.on_exiting() 334 } 335 for hook, callback in event_hooks.items(): 336 self.add_state_event_callback(hook, callback) File [~/project/super/code/aiida-core/aiida/engine/processes/process.py:426](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/project/super/code/aiida-core/aiida/engine/processes/process.py:426), in Process.on_entered(self, from_state) 424 self._save_checkpoint() 425 set_process_state_change_timestamp(self) --> 426 super().on_entered(from_state) File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:714](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:714), in Process.on_entered(self, from_state) 712 self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject) 713 try: --> 714 self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject) 715 except ConnectionClosed: 716 message = 'Process<%s>: no connection available to broadcast state change from %s to %s' File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:175](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:175), in LoopCommunicator.broadcast_send(self, body, sender, subject, correlation_id) 168 def broadcast_send( 169 self, 170 body: Optional[Any], (...) 173 correlation_id: Optional['ID_TYPE'] = None 174 ) -> futures.Future: --> 175 return self._communicator.broadcast_send(body, sender, subject, correlation_id) File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:258](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:258), in RmqThreadCommunicator.broadcast_send(self, body, sender, subject, correlation_id) 256 def broadcast_send(self, body, sender=None, subject=None, correlation_id=None): 257 self._ensure_open() --> 258 result = self._loop_scheduler.await_( 259 self._communicator.broadcast_send(body=body, sender=sender, subject=subject, correlation_id=correlation_id) 260 ) 261 return isinstance(result, pamqp.specification.Basic.Ack) File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164), in LoopScheduler.await_(self, awaitable, name) 153 """ 154 Await an awaitable on the event loop and return the result. It may take a little time for 155 the loop to get around to scheduling it, so we use a timeout as set by the TASK_TIMEOUT class (...) 161 :return: the result of running the coroutine 162 """ 163 try: --> 164 return self.await_submit(awaitable).result(timeout=self.task_timeout) 165 except concurrent.futures.TimeoutError as exc: 166 # Try to get a reasonable name for the awaitable 167 name = name or getattr(awaitable, "__name__", "Awaitable") File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446), in Future.result(self, timeout) 444 raise CancelledError() 445 elif self._state == FINISHED: --> 446 return self.__get_result() 447 else: 448 raise TimeoutError() File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391), in Future.__get_result(self) 389 if self._exception: 390 try: --> 391 raise self._exception 392 finally: 393 # Break a reference cycle with the exception in self._exception 394 self = None File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258), in Task.__step(***failed resolving arguments***) 256 result = coro.send(None) 257 else: --> 258 result = coro.throw(exc) 259 except StopIteration as exc: 260 if self._must_cancel: 261 # Task is cancelled right before coro stops. File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178), in LoopScheduler.await_submit..coro() 177 async def coro(): --> 178 res = await awaitable 179 if asyncio.isfuture(res): 180 future = ThreadFuture() File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:522](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:522), in RmqCommunicator.broadcast_send(self, body, sender, subject, correlation_id) 520 async def broadcast_send(self, body, sender=None, subject=None, correlation_id=None): 521 publisher = await self.get_message_publisher() --> 522 result = await publisher.broadcast_send(body, sender, subject, correlation_id) 523 return result File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:66](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:66), in RmqPublisher.broadcast_send(self, msg, sender, subject, correlation_id) 61 message = aio_pika.Message( 62 body=self._encode(message_dict), 63 delivery_mode=aio_pika.DeliveryMode.NOT_PERSISTENT, 64 ) 65 # Send as mandatory=False because we don't expect the message to be routable to anyone ---> 66 return await self.publish(message, routing_key=defaults.BROADCAST_TOPIC, mandatory=False) File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/messages.py:209](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/messages.py:209), in BasePublisherWithReplyQueue.publish(self, message, routing_key, mandatory) 200 async def publish(self, message, routing_key, mandatory=True): 201 """ 202 Send a fire-and-forget message i.e. no response expected. 203 (...) 207 :return: 208 """ --> 209 result = await self._exchange.publish(message, routing_key=routing_key, mandatory=mandatory) 210 return result File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/exchange.py:233](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/exchange.py:233), in Exchange.publish(self, message, routing_key, mandatory, immediate, timeout) 227 if self.internal: 228 # Caught on the client side to prevent channel closure 229 raise ValueError( 230 "Can not publish to internal exchange: '%s'!" % self.name, 231 ) --> 233 return await asyncio.wait_for( 234 self.channel.basic_publish( 235 exchange=self.name, 236 routing_key=routing_key, 237 body=message.body, 238 properties=message.properties, 239 mandatory=mandatory, 240 immediate=immediate, 241 ), 242 timeout=timeout, 243 ) File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442), in wait_for(fut, timeout, loop) 437 warnings.warn("The loop argument is deprecated since Python 3.8, " 438 "and scheduled for removal in Python 3.10.", 439 DeprecationWarning, stacklevel=2) 441 if timeout is None: --> 442 return await fut 444 if timeout <= 0: 445 fut = ensure_future(fut, loop=loop) File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:547](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/epw-workchain/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:547), in Channel.basic_publish(self, body, exchange, routing_key, properties, mandatory, immediate, timeout) 544 if not self.publisher_confirms: 545 return --> 547 return await asyncio.wait_for(confirmation, timeout=timeout) File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442), in wait_for(fut, timeout, loop) 437 warnings.warn("The loop argument is deprecated since Python 3.8, " 438 "and scheduled for removal in Python 3.10.", 439 DeprecationWarning, stacklevel=2) 441 if timeout is None: --> 442 return await fut 444 if timeout <= 0: 445 fut = ensure_future(fut, loop=loop) File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284), in Future.__await__(self) 282 if not self.done(): 283 self._asyncio_future_blocking = True --> 284 yield self # This tells Task to wait for completion. 285 if not self.done(): 286 raise RuntimeError("await wasn't used with future") File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328), in Task.__wakeup(self, future) 326 def __wakeup(self, future): 327 try: --> 328 future.result() 329 except BaseException as exc: 330 # This may also be a cancellation. 331 self.__step(exc) File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201), in Future.result(self) 199 self.__log_traceback = False 200 if self._exception is not None: --> 201 raise self._exception 202 return self._result DeliveryError: (None, ) ```

Still figuring out what happened, all my checkmarks are green:

❯ verdi status
 ✔ version:     AiiDA v2.3.0
 ✔ config:      /Users/mbercx/project/super/.aiida
 ✔ profile:     dev
 ✔ storage:     Storage for 'dev' [open] @ postgresql://mbercx:***@localhost:5432/super-dev / DiskObjectStoreRepository: 39319bead002422387f2793c4e406dc6 | /Users/mbercx/project/super/repositories/dev/container
 ✔ rabbitmq:    Connected to RabbitMQ v3.11.13 as amqp://guest:guest@127.0.0.1:5672?heartbeat=600
 ✔ daemon:      Daemon is running with PID 78099

Versions:

mbercx commented 1 year ago

The issue seems to be transient. Turning it off and on again resolved it.

mbercx commented 1 year ago

The processes did get created, but are not picked up as the daemon is (re)started.

sphuber commented 1 year ago

Try running verdi devel rabbitmq tasks analyze to see if there are inconsistencies

mbercx commented 1 year ago

Indeed

❯ verdi devel rabbitmq tasks analyze
Warning: There are active processes without process task: {531970, 531717, 532038, 531463, 532012, 531984, 532025, 531931, 531998}
Critical: Inconsistencies detected between database and RabbitMQ. Run again with `--fix` to address problems.

This was in a different environment (I already cleaned up the ones above, apparently). Do you need me to check anything else to figure out what caused the problem?

sphuber commented 1 year ago

Not really I'm afraid. The was just to help with the processes not getting started after submission. That is most likely due to the missing task, which can be fixed with that command. The only hope for debugging this is to be able to reproduce it, and since you said it is transient, that is going to be tricky 😅

mbercx commented 1 year ago

Haha, fair! It did keep on happening before I had to shut down my computer (was moving location with my mac Mini, so had to shut down unfortunately), so next time maybe we can do some live debugging. ^^

Hmm, searching through the documentation doesn't give any clue about what is going on, or how to fix it with that tasty verdi devel command. Maybe we should have add it to a suitable "troubleshooting" section?

Screenshot 2023-05-13 at 19 43 25

sphuber commented 1 year ago

For the DeliveryError, I haven't seen it before, so no idea what could be going on.

As for verdi devel rabbitmq, I only added that quite recently, and since it was experimental and only to be used in case of problems caused by bugs, we decided to put it under verdi devel. But we have used it multiple times now for various users and it seems to be working quite well. I think it would be time to maybe add an entry in the FAQ like "My jobs are stuck in "Created" state." and advertize this verdi devel rabbitmq tasks analyze --fix command as it will automagically correct things.

mbercx commented 1 year ago

Another note: Seems the output of a different submission attempt (in the original project) was still captured in my notebook. Here the error trace is different:

Full Traceback ```python --------------------------------------------------------------------------- ChannelInvalidStateError Traceback (most recent call last) Cell In[5], line 22 17 builder.base.pw.parallelization = orm.Dict({'npool': 2}) 18 builder.base.pw.metadata.options.resources = { 19 'num_machines': 2, 20 'num_mpiprocs_per_machine': 1 21 } ---> 22 submit(builder) File [~/project/super/code/aiida-core/aiida/engine/launch.py:103](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/project/super/code/aiida-core/aiida/engine/launch.py:103), in submit(process, **inputs) 100 assert runner.persister is not None, 'runner does not have a persister' 101 assert runner.controller is not None, 'runner does not have a persister' --> 103 process_inited = instantiate_process(runner, process, **inputs) 105 # If a dry run is requested, simply forward to `run`, because it is not compatible with `submit`. We choose for this 106 # instead of raising, because in this way the user does not have to change the launcher when testing. The same goes 107 # for if `remote_folder` is present in the inputs, which means we are importing an already completed calculation. 108 if process_inited.metadata.get('dry_run', False) or 'remote_folder' in inputs: File [~/project/super/code/aiida-core/aiida/engine/utils.py:64](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/project/super/code/aiida-core/aiida/engine/utils.py:64), in instantiate_process(runner, process, **inputs) 61 else: 62 raise ValueError(f'invalid process {type(process)}, needs to be Process or ProcessBuilder') ---> 64 process = process_class(runner=runner, inputs=inputs) 66 return process File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:195](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/state_machine.py:195), in StateMachineMeta.__call__(cls, *args, **kwargs) 193 inst = super().__call__(*args, **kwargs) 194 inst.transition_to(inst.create_initial_state()) --> 195 call_with_super_check(inst.init) 196 return inst File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:29](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:29), in call_with_super_check(wrapped, *args, **kwargs) 27 call_count = getattr(self, '_called', 0) 28 self._called = call_count + 1 ---> 29 wrapped(*args, **kwargs) 30 msg = f"Base '{wrapped.__name__}' was not called from '{self.__class__}'\nHint: Did you forget to call the super?" 31 assert self._called == call_count, msg File [~/project/super/code/aiida-core/aiida/engine/processes/process.py:187](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/project/super/code/aiida-core/aiida/engine/processes/process.py:187), in Process.init(self) 186 def init(self) -> None: --> 187 super().init() 188 if self._logger is None: 189 self.set_logger(self.node.logger) File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:16](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/base/utils.py:16), in super_check..wrapper(self, *args, **kwargs) 14 msg = f"The function '{wrapped.__name__}' was not called through call_with_super_check" 15 assert getattr(self, '_called', 0) >= 1, msg ---> 16 wrapped(self, *args, **kwargs) 17 self._called -= 1 File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:303](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/processes.py:303), in Process.init(self) 301 if self._communicator is not None: 302 try: --> 303 identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid)) 304 self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier)) 305 except kiwipy.TimeoutError: File [~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:141](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/plumpy/communications.py:141), in LoopCommunicator.add_rpc_subscriber(self, subscriber, identifier) 139 def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: Optional['ID_TYPE'] = None) -> 'ID_TYPE': 140 converted = convert_to_comm(subscriber, self._loop) --> 141 return self._communicator.add_rpc_subscriber(converted, identifier) File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:215](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py:215), in RmqThreadCommunicator.add_rpc_subscriber(self, subscriber, identifier) 213 def add_rpc_subscriber(self, subscriber, identifier=None): 214 self._ensure_open() --> 215 return self._loop_scheduler.await_( 216 self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier) 217 ) File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:164), in LoopScheduler.await_(self, awaitable, name) 153 """ 154 Await an awaitable on the event loop and return the result. It may take a little time for 155 the loop to get around to scheduling it, so we use a timeout as set by the TASK_TIMEOUT class (...) 161 :return: the result of running the coroutine 162 """ 163 try: --> 164 return self.await_submit(awaitable).result(timeout=self.task_timeout) 165 except concurrent.futures.TimeoutError as exc: 166 # Try to get a reasonable name for the awaitable 167 name = name or getattr(awaitable, "__name__", "Awaitable") File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:446), in Future.result(self, timeout) 444 raise CancelledError() 445 elif self._state == FINISHED: --> 446 return self.__get_result() 447 else: 448 raise TimeoutError() File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:391), in Future.__get_result(self) 389 if self._exception: 390 try: --> 391 raise self._exception 392 finally: 393 # Break a reference cycle with the exception in self._exception 394 self = None File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:258), in Task.__step(***failed resolving arguments***) 256 result = coro.send(None) 257 else: --> 258 result = coro.throw(exc) 259 except StopIteration as exc: 260 if self._must_cancel: 261 # Task is cancelled right before coro stops. File [~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/pytray/aiothreads.py:178), in LoopScheduler.await_submit..coro() 177 async def coro(): --> 178 res = await awaitable 179 if asyncio.isfuture(res): 180 future = ThreadFuture() File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:481](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:481), in RmqCommunicator.add_rpc_subscriber(self, subscriber, identifier) 480 async def add_rpc_subscriber(self, subscriber, identifier=None): --> 481 msg_subscriber = await self.get_message_subscriber() 482 identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier) 483 return identifier File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:427](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:427), in RmqCommunicator.get_message_subscriber(self) 418 if self._message_subscriber is None: 419 subscriber = RmqSubscriber( 420 self._connection, 421 message_exchange=self._message_exchange, (...) 425 testing_mode=self._testing_mode 426 ) --> 427 await subscriber.connect() 428 self._message_subscriber = subscriber 430 return self._message_subscriber File [~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:177](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/kiwipy/rmq/communicator.py:177), in RmqSubscriber.connect(self) 174 if self._testing_mode: 175 exchange_params.setdefault('auto_delete', self._testing_mode) --> 177 self._channel = await self._connection.channel() 178 self._exchange = await self._channel.declare_exchange(name=self._exchange_name, **exchange_params) 180 await self._create_broadcast_queue() File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:127](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:127), in Channel.__await__(self) 126 def __await__(self): --> 127 yield from self.initialize().__await__() 128 return self File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/robust_channel.py:87](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/robust_channel.py:87), in RobustChannel.initialize(self, timeout) 86 async def initialize(self, timeout: TimeoutType = None) -> None: ---> 87 await super().initialize(timeout) 88 self.add_close_callback(self._on_channel_close) File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:172](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:172), in Channel.initialize(self, timeout) 169 if self._channel is not None: 170 raise RuntimeError("Can't initialize channel") --> 172 self._channel = await asyncio.wait_for( 173 self._create_channel(), timeout=timeout, 174 ) 176 self._delivery_tag = 0 178 if self.default_exchange is None: File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:442), in wait_for(fut, timeout, loop) 437 warnings.warn("The loop argument is deprecated since Python 3.8, " 438 "and scheduled for removal in Python 3.10.", 439 DeprecationWarning, stacklevel=2) 441 if timeout is None: --> 442 return await fut 444 if timeout <= 0: 445 fut = ensure_future(fut, loop=loop) File [~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:162](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aio_pika/channel.py:162), in Channel._create_channel(self) 159 async def _create_channel(self) -> aiormq.Channel: 160 await self._connection.ready() --> 162 return await self._connection.connection.channel( 163 publisher_confirms=self._publisher_confirms, 164 on_return_raises=self._on_return_raises, 165 channel_number=self._channel_number, 166 ) File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/connection.py:527](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/connection.py:527), in Connection.channel(self, channel_number, publisher_confirms, frame_buffer, **kwargs) 524 self.channels[channel_number] = channel 526 try: --> 527 await channel.open() 528 except Exception: 529 self.channels[channel_number] = None File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:174](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:174), in Channel.open(self) 173 async def open(self): --> 174 frame = await self.rpc(spec.Channel.Open()) 176 if self.publisher_confirms: 177 await self.rpc(spec.Confirm.Select()) File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:168](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:168), in task..wrap(self, *args, **kwargs) 165 @wraps(func) 166 async def wrap(self: "Base", *args, **kwargs): 167 # noinspection PyCallingNonCallable --> 168 return await self.create_task(func(self, *args, **kwargs)) File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:25](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/base.py:25), in TaskWrapper.__inner(self) 23 async def __inner(self): 24 try: ---> 25 return await self.task 26 except asyncio.CancelledError as e: 27 raise self.exception from e File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:284), in Future.__await__(self) 282 if not self.done(): 283 self._asyncio_future_blocking = True --> 284 yield self # This tells Task to wait for completion. 285 if not self.done(): 286 raise RuntimeError("await wasn't used with future") File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:328), in Task.__wakeup(self, future) 326 def __wakeup(self, future): 327 try: --> 328 future.result() 329 except BaseException as exc: 330 # This may also be a cancellation. 331 self.__step(exc) File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/futures.py:201), in Future.result(self) 199 self.__log_traceback = False 200 if self._exception is not None: --> 201 raise self._exception 202 return self._result File [/opt/homebrew/Cellar/python](https://file+.vscode-resource.vscode-cdn.net/opt/homebrew/Cellar/python)@3.9[/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:256](https://file+.vscode-resource.vscode-cdn.net/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py:256), in Task.__step(***failed resolving arguments***) 252 try: 253 if exc is None: 254 # We use the `send` method directly, because coroutines 255 # don't have `__iter__` and `__next__` methods. --> 256 result = coro.send(None) 257 else: 258 result = coro.throw(exc) File [~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:121](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/super/jupyter/~/.virtualenvs/super/lib/python3.9/site-packages/aiormq/channel.py:121), in Channel.rpc(self, frame, timeout) 118 return value 120 if self.writer is None: --> 121 raise ChannelInvalidStateError("writer is None") 123 lock = self.lock 125 try: ChannelInvalidStateError: writer is None ```

Also note that the problem was present across different environments.

sphuber commented 1 year ago

That exception actually is familiar, see #4595 . I think this is due to an instability in the connection with RabbitMQ that is managed by aio-pika and aiormq. They both have significantly more recent versions with fixes to the connection stability, as well as auto-reconnect in case it is lost. I suspect that this could help a lot in dealing with these problems. I have prepared a branch that has been open for a very long time (see #5732) but I haven't been able to merge it since there is one test that fails, namely the one shutting down the communicator. I have been debugging this for hours but couldn't solve it and neither could @muhrin for the time being. If you'd like, I could rebase it to make it up to date with the latest version, and maybe you can give it a go? All the tests are running except the one that shuts the daemon. So running should work fine in principle, and it would provide valuable information if you wouldn't see these problems anymore when submitting.

mbercx commented 1 year ago

If you'd like, I could rebase it to make it up to date with the latest version, and maybe you can give it a go?

It would be a good opportunity for me to get more familiar with these tools and the engine, but doing so would most likely take more time than I can commit to at the moment. I'll try and pick this up once I've checked some boxes, if you haven't fixed it by then of course. ^^