dapper91 / pjrpc

python json-rpc client/server without boilerplate
https://pjrpc.readthedocs.io
The Unlicense
30 stars 3 forks source link

aio_pika_client@1.6.0: Exception from close() after connect() #91

Open bernhardkaindl opened 1 year ago

bernhardkaindl commented 1 year ago
import asyncio

from pjrpc.client.backend import aio_pika as pjrpc_client

async def main():
    client = pjrpc_client.Client('amqp://guest:guest@localhost:5672/v1', 'jsonrpc')
    await client.connect()
    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

with 1.6.0, this results in:

  File "aio_pika_client-close-test.py", line 9, in main
    await client.close()
  File "/home/autonoma/flowhub/pjrpc/pjrpc/client/backend/aio_pika.py", line 86, in close
    assert self._result_queue is not None, "client is not initialized"
AssertionError: client is not initialized

self._result_queue is always none, unless the Client is costructed to use a fixed result_queue like this:

    kwargs["result_queue_name"] = "jsonrpc-responses"
    kwargs["result_queue_args"] = {
        "exclusive": False,
        "auto_delete": False,
        "durable": True,
        "arguments": None,
    }
    client = xjsonrpc_client.Client(
        URL("amqp://guest:guest@localhost:5672/"),
        "jsonrpc-requests",
        **kwargs,
    )

But for the majority of uses, a fixed result queue is not needed and a temporary result queue is used for each call (which is easier if you want to run more than one RPC client)

bernhardkaindl commented 1 year ago

This assertion was added by #74 to fix a mypy warning: https://github.com/dapper91/pjrpc/blob/27a963085b6724be818a3407df7f523917622f63/pjrpc/client/backend/aio_pika.py#L86

Context: git log -p -U11 |grep -C7 "+.*assert self._result_queue"


     async def close(self) -> None:
         """
         Closes current broker connection.
         """

+        assert self._channel is not None, "client is not initialized"
+        assert self._connection is not None, "client is not initialized"
+        assert self._result_queue is not None, "client is not initialized"
+
         if self._consumer_tag:
             await self._result_queue.cancel(self._consumer_tag)
             self._consumer_tag = None

         await self._channel.close()
         await self._connection.close()
bernhardkaindl commented 1 year ago

Proposed fix:

-        assert self._result_queue is not None, "client is not initialized"

-         if self._consumer_tag:
+         if self._consumer_tag and self._result_queue:
             await self._result_queue.cancel(self._consumer_tag)
             self._consumer_tag = None

         await self._channel.close()
         await self._connection.close()             

Reason: self._result_queue is not initialized by default, only when a fixed result queue is used (see the description).

Also it should be possible to call close the a connection (self._clannel and self._connection), even when one of them is not set.

This could happen in some circumstances, e.g. when handling a KeyboardInterrupt or other terminating while a connection is attempted to be connected or closed and it would be better to handle this by closing whatever is open and finish any futures than to throw an AssertionError. Thus I'll also threat these two cases without throwing an AssertionError. I've opened #92 with these proposed fixes.

bernhardkaindl commented 1 year ago

@dapper91 ping