project-receptor / python-receptor

Project Receptor is a flexible multi-service relayer with remote execution and orchestration capabilities linking controllers with executors across a mesh of nodes.
Other
32 stars 21 forks source link

Exception raised when starting a receptor node with messages to expire #88

Closed elyezer closed 4 years ago

elyezer commented 4 years ago

I've been working on trying to get more information about #79 and I had ping running and a node missing on the network so all the messages was being held on the controller node. Today when trying to start the controller again but now without cleaning the data directory it showed that the messages left yesterday were about to expire:

...
INFO 2019-12-20 11:09:37,812 controller file Expiring message 6f628495-04f7-4aa3-a48f-e2ca92b2dc67
INFO 2019-12-20 11:09:37,816 controller file Expiring message 21e3a76f-3170-4cf6-9146-c1a5cae54294

After getting to the end of expired log message the following exception appeared in the output:

Task exception was never retrieved
future: <Task finished coro=<Receptor.watch_expire() done, defined at /home/elyezer/code/receptor/receptor/receptor/receptor.py:48> exception=FileNotFoundError(2, 'No such file or directory')>
Traceback (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/receptor.py", line 53, in watch_expire
    await buffer.expire()
  File "/home/elyezer/code/receptor/receptor/receptor/buffers/file.py", line 121, in expire
    await self._loop.run_in_executor(pool, os.remove, self._path_for_ident(ident))
  File "/usr/lib64/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/controller/messages/21e3a76f-3170-4cf6-9146-c1a5cae54294'
elyezer commented 4 years ago

After seeing the exception mentioned above, I went ahead and created a new node and made it a peer of the controller. Next I've tried to ping the the new spawned node and I got the following output after the exception mentioned on the previous comment:

...
Exception stacktrace... then:

DEBUG 2019-12-20 11:16:15,480 controller base waiting for HI
DEBUG 2019-12-20 11:16:15,481 controller base sending HI
DEBUG 2019-12-20 11:16:15,482 controller base sending routes
DEBUG 2019-12-20 11:16:15,482 controller receptor Emitting Route Advertisements, excluding set()
DEBUG 2019-12-20 11:16:15,484 controller base starting normal loop
DEBUG 2019-12-20 11:16:15,484 controller receptor spawning message_handler
DEBUG 2019-12-20 11:16:15,488 controller receptor message_handler: FramedMessage(msg_id=65270098242076684023547538304219437655, header={'cmd': 'ROUTE', 'id': 'node-a', 'capabilities': [['debug', '0.0.1'], ['debug_http', '0.0.1']], 'groups': [], 'edges': [['controller', 'node-a', 1]], 'seen': ['node-a', 'controller']}, payload=None)
DEBUG 2019-12-20 11:16:15,489 controller receptor Emitting Route Advertisements, excluding {'node-a', 'controller'}
DEBUG 2019-12-20 11:16:37,270 controller base waiting for HI
DEBUG 2019-12-20 11:16:37,272 controller base sending HI
DEBUG 2019-12-20 11:16:37,273 controller base sending routes
DEBUG 2019-12-20 11:16:37,273 controller receptor Emitting Route Advertisements, excluding set()
DEBUG 2019-12-20 11:16:37,278 controller base starting normal loop
DEBUG 2019-12-20 11:16:37,278 controller receptor spawning message_handler
DEBUG 2019-12-20 11:16:37,279 controller receptor message_handler: FramedMessage(msg_id=87095259225122172305872613616857345923, header={'sender': '8b223bc8-bef7-41bd-8f6d-72e464598e81', 'recipient': 'node-c', 'route_list': ['8b223bc8-bef7-41bd-8f6d-72e464598e81', '8b223bc8-bef7-41bd-8f6d-72e464598e81']}, payload=bytearray(b'{"message_id": "95fc11f4-293a-4b25-9b84-21abe5b135ab", "sender": "8b223bc8-bef7-41bd-8f6d-72e464598e81", "recipient": "node-c", "message_type": "directive", "timestamp": "2019-12-20T15:17:03.493965", "raw_payload": "2019-12-20T15:17:03.493965", "directive": "receptor:ping", "in_response_to": null, "ttl": 15, "serial": 1, "code": 0}'))
INFO 2019-12-20 11:16:37,279 controller directive Received ping from 8b223bc8-bef7-41bd-8f6d-72e464598e81
ERROR 2019-12-20 11:16:37,279 controller receptor error in handle_message: Invalid directive -> 'receptor:ping'. Sending failure response back.

This is similar to what I am observing on #79 and the reason the ping directive might not be found is due to an exception when reusing the same data dir. I can't 100% confirm this but it seems that we are getting close to narrow it down.

This is the dump of the stacks of the coroutines on the controller node:

Stack for <Task pending coro=<Receptor.shutdown_handler() running at /home/elyezer/code/receptor/receptor/receptor/receptor.py:141> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4558>()]> cb=[_run_until_complete_cb() at /usr/lib64/python3.6/asyncio/base_events.py:196]> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/receptor.py", line 141, in shutdown_handler
    await asyncio.sleep(1)
Stack for <Task pending coro=<watch_queue() running at /home/elyezer/code/receptor/receptor/receptor/connection/base.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4c48>()]> cb=[<TaskWakeupMethWrapper object at 0x7f5f3b762d08>()]> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/connection/base.py", line 29, in watch_queue
    msg = await asyncio.wait_for(buf.get(), 5.0)
Stack for <Task pending coro=<serve() running at /home/elyezer/code/receptor/receptor/receptor/connection/sock.py:53> wait_for=<Task pending coro=<watch_queue() running at /home/elyezer/code/receptor/receptor/receptor/connection/base.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4eb8>()]> cb=[<TaskWakeupMethWrapper object at 0x7f5f3b762ca8>()]>> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/connection/sock.py", line 53, in serve
    await factory().server(t)
Stack for <Task pending coro=<watch_queue() running at /home/elyezer/code/receptor/receptor/receptor/connection/base.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4eb8>()]> cb=[<TaskWakeupMethWrapper object at 0x7f5f3b762ca8>()]> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/connection/base.py", line 29, in watch_queue
    msg = await asyncio.wait_for(buf.get(), 5.0)
Stack for <Task pending coro=<DurableBuffer.get() running at /home/elyezer/code/receptor/receptor/receptor/buffers/file.py:60> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4888>()]> cb=[_release_waiter(<Future pendi...f386d4eb8>()]>)() at /usr/lib64/python3.6/asyncio/tasks.py:316]> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/buffers/file.py", line 60, in get
    msg = await self.q.get()
Stack for <Task pending coro=<serve() running at /home/elyezer/code/receptor/receptor/receptor/connection/sock.py:53> wait_for=<Task pending coro=<watch_queue() running at /home/elyezer/code/receptor/receptor/receptor/connection/base.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4c48>()]> cb=[<TaskWakeupMethWrapper object at 0x7f5f3b762d08>()]>> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/connection/sock.py", line 53, in serve
    await factory().server(t)
Stack for <Task pending coro=<DurableBuffer.get() running at /home/elyezer/code/receptor/receptor/receptor/buffers/file.py:60> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4828>()]> cb=[_release_waiter(<Future pendi...f386d4c48>()]>)() at /usr/lib64/python3.6/asyncio/tasks.py:316]> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/buffers/file.py", line 60, in get
    msg = await self.q.get()
Stack for <Task pending coro=<Worker.receive() running at /home/elyezer/code/receptor/receptor/receptor/connection/base.py:65> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f386d4d68>()]>> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/connection/base.py", line 65, in receive
    async for msg in self.conn:
Traceback for <Task finished coro=<Receptor.message_handler() done, defined at /home/elyezer/code/receptor/receptor/receptor/receptor.py:90> exception=UnrouteableError('No route found to 8b223bc8-bef7-41bd-8f6d-72e464598e81',)> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/receptor.py", line 106, in message_handler
    await self.handle_message(data)
  File "/home/elyezer/code/receptor/receptor/receptor/receptor.py", line 237, in handle_message
    await handlers[inner.message_type](inner)
  File "/home/elyezer/code/receptor/receptor/receptor/receptor.py", line 199, in handle_directive
    await self.router.send(err_resp)
  File "/home/elyezer/code/receptor/receptor/receptor/router.py", line 155, in send
    raise UnrouteableError(f'No route found to {inner_envelope.recipient}')
receptor.exceptions.UnrouteableError: No route found to 8b223bc8-bef7-41bd-8f6d-72e464598e81
Stack for <Task pending coro=<Receptor.message_handler() running at /home/elyezer/code/receptor/receptor/receptor/receptor.py:94> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f3b762a38>()]>> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/receptor.py", line 94, in message_handler
    data = await buf.get()
Stack for <Task pending coro=<Worker.receive() running at /home/elyezer/code/receptor/receptor/receptor/connection/base.py:65> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f5f3b762b58>()]>> (most recent call last):
  File "/home/elyezer/code/receptor/receptor/receptor/connection/base.py", line 65, in receive
    async for msg in self.conn: