kyuupichan / aiorpcX

Generic async RPC implementation, including JSON-RPC
MIT License
27 stars 23 forks source link

session.send_request: Future exception was never retrieved #22

Closed SomberNight closed 5 years ago

SomberNight commented 5 years ago

On git HEAD https://github.com/kyuupichan/aiorpcX/commit/5e40bde99568af7f3cf01bf9769ab782ffc691d1. Using cpython 3.7.3

In Electrum, with latest aiorpcX, I am sometimes getting hundreds of traces dumped in the log, Future exception was never retrieved, if I have loop.set_debug(1) set (I have it now because I am debugging another issue).

This issue arises as the future is created at https://github.com/kyuupichan/aiorpcX/blob/5e40bde99568af7f3cf01bf9769ab782ffc691d1/aiorpcx/session.py#L522-L525

and then the coroutine will wait for the semaphore at https://github.com/kyuupichan/aiorpcX/blob/5e40bde99568af7f3cf01bf9769ab782ffc691d1/aiorpcx/session.py#L490-L492

and while hundreds of coroutines are waiting for the semaphore, they will all get cancelled, e.g. because of a disconnect; and then asyncio will complain because the futures were never awaited.


Example server code:

import asyncio
from aiorpcx import RPCSession, serve_rs

class MyServerSession(RPCSession):
    num_req = 0
    async def handle_request(self, request):
        self.num_req += 1
        print(f"num_req: {repr(self.num_req)}")
        await asyncio.sleep(3)

loop = asyncio.get_event_loop()
coro = serve_rs(MyServerSession, 'localhost', 12345, loop=loop)
server = loop.run_until_complete(coro)
loop.run_forever()

Example client code:

import traceback
import asyncio
from aiorpcx import connect_rs, TaskGroup, RPCSession, TaskTimeout

class MyClientSession(RPCSession):
    async def send_request(self, *args, **kwargs):
        try:
            return await super().send_request(*args, **kwargs)
        except TaskTimeout as e:
            raise Exception(e) from None

async def coro():
    async with connect_rs('localhost', 12345, session_factory=MyClientSession) as session:
        session.sent_request_timeout = 1
        try:
            now = loop.time()
            async with TaskGroup() as group:
                for i in range(1000):
                    await group.spawn(session.send_request('echo', []))
                print("all requests queued")
        except Exception as e:
            traceback.print_exc()
        finally:
            print(f"coro exiting.")

loop = asyncio.get_event_loop()
loop.set_debug(1)
server = loop.run_until_complete(coro())

Note, in client, loop.set_debug(1).

Trace in client:

Future exception was never retrieved
future: <Future finished exception=CancelledError() created at C:\Users\User\AppData\Local\Programs\Python\Python37\lib\asyncio\base_events.py:396>
source_traceback: Object created at (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\asyncio\base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\asyncio\base_events.py", line 539, in run_forever
    self._run_once()
  File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\asyncio\base_events.py", line 1767, in _run_once
    handle._run()
  File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\asyncio\events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "<stdin>", line 4, in send_request
  File "...\aiorpcX\aiorpcx\session.py", line 524, in send_request
    message, future = self.connection.send_request(Request(method, args))
  File "...\aiorpcX\aiorpcx\jsonrpc.py", line 698, in send_request
    return message, self._future(request, request_id)
  File "...\aiorpcX\aiorpcx\jsonrpc.py", line 679, in _future
    future = self._create_future()
  File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\asyncio\base_events.py", line 396, in create_future
    return futures.Future(loop=self)
concurrent.futures._base.CancelledError