mosquito / aiormq

Pure python AMQP 0.9.1 asynchronous client library
Other
268 stars 58 forks source link

Queue declare fails for specific values of `x-expires` #97

Open sphuber opened 3 years ago

sphuber commented 3 years ago

Consider the following simple script:

#!/usr/bin/env python
import asyncio
import aiormq

X_EXPIRES = 32767

async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost:5672/")

    # Creating a channel
    channel = await connection.channel()
    queue = await channel.queue_declare('test2', arguments={'x-expires': X_EXPIRES})

    # Sending the message
    await channel.basic_publish(b'Hello World!', routing_key='hello')
    print(" [x] Sent 'Hello World!'")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

With the value X_EXPIRES = 32767 this works just fine, however, when upping it by one to X_EXPIRES = 32768 the code will except with the following stacktrace:

Traceback (most recent call last):
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./test_rmq.py", line 20, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "./test_rmq.py", line 12, in main
    queue = await channel.queue_declare('test2', arguments={'x-expires': 32768})
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
    timeout=timeout,
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/usr/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/base.py", line 27, in __inner
    raise self.exception from e
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 423, in __close_writer
    await writer.wait_closed()
  File "/usr/lib/python3.7/asyncio/streams.py", line 323, in wait_closed
    await self._protocol._closed
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 375, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/home/aiida/.virtualenvs/aiida_dev/lib/python3.7/site-packages/aiormq/connection.py", line 327, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib/python3.7/asyncio/streams.py", line 679, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.7/asyncio/streams.py", line 473, in _wait_for_data
    await self._waiter
  File "/usr/lib/python3.7/asyncio/selector_events.py", line 814, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

It seems like it may have something to do with the incorrect interpretation or casting of the integer value, because the magical limit corresponds to 2^15 == 32768 so maybe it is defined as signed two byte int. The same behavior happens for example at 2^16 == 65536 where X_EXPIRES = 66535 fails but X_EXPIRES = 66536 works again. So it seems the range [32768 - 65535] i.e. 2^15 <= X_EXPIRES < 2^16 will cause the exception.

Note that this only seems to happen with RabbitMQ 3.5. I cannot reproduce this with RabbitMQ 3.6. Have you got an idea why this might be happening, and would it be possible to create a patch for this?

mosquito commented 3 years ago

Please provide your RabbitMQ version and try using the new one. A similar problem has been found in pamqp before.

sphuber commented 3 years ago

My RabbitMQ version is 3.5.7. As I mentioned, I can confirm that this bug is not present in combination with RMQ 3.6. I realize that this version has been EOL as of 31 October 2016, but it is still the default version of Ubuntu 16.04, which is an LTS and which has support until 2024.

sphuber commented 3 years ago

Update: adding the following at the top of the script will make things work:

import pamqp.encode
pamqp.encode.support_deprecated_rabbitmq()

This suggestion came from a comment on a related issue on aio-pika by @gmr.

By the way, this made me realize that aiormq does not explicitly declare pamqp as a direct dependency in setup.py even though it does use it extensively. Do you want me to open another issue for this? Edit: I must have misread the setup.py because it seems it does already specify an explicit dependency on pamqp