mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.25k stars 190 forks source link

Aio_pika (with aiormq) is now incompatible with RabbitMQ 3.3 #197

Closed zarybnicky closed 5 years ago

zarybnicky commented 5 years ago

After upgrading to the lastest aio_pika version, our code communicating with RabbitMQ 3.3.5 (don't ask why...) stopped working. We've traced the problem to a queue_declare call with arguments that include x-message-ttl, without it the code works OK. I thought the problem was bad serialization, but any type other than int gets an expected error from RabbitMQ.

Testing with RabbitMQ 3.7 works without a problem, so we're now downgrading indefinitely to 4.* as we are unable to upgrade our Rabbit instance.

For further reference, the relevant stack trace:

[2019-02-21 17:24:48,805] aiormq.connection - DEBUG - Can not read bytes from server:
Traceback (most recent call last):
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/connection.py", line 295, in __reader
    weight, channel, frame = await self.__receive_frame()
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 161, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 22, in __await__
    return (yield from self.task.__await__(*args, **kwargs))
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/connection.py", line 246, in __receive_frame
    frame_header = await self.reader.readexactly(1)
  File "/usr/lib64/python3.6/asyncio/streams.py", line 672, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 1 expected bytes

[2019-02-21 17:24:48,809] asyncio - ERROR - Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib64/python3.6/asyncio/tasks.py:530> exception=ConnectionError('0 bytes read on a total of 1 expected bytes',)>
Traceback (most recent call last):
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 22, in __await__
    return (yield from self.task.__await__(*args, **kwargs))
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "main.py", line 4, in <module>
    main()
  [...]
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/mundoscaldis/services/rabbitmq_service.py", line 102, in _declare_queue
    que.real_queue = await self.chan.declare_queue(que.name, **que.opts)
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aio_pika/robust_channel.py", line 127, in declare_queue
    arguments=arguments, timeout=timeout,
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aio_pika/channel.py", line 239, in declare_queue
    await queue.declare(timeout=timeout)
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aio_pika/queue.py", line 86, in declare
    ), timeout=timeout, loop=self.loop
  File "/usr/lib64/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/channel.py", line 505, in queue_declare
    arguments=arguments,
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 161, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 24, in __await__
    raise self.exception from e
  File "/usr/lib64/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 24, in __await__
    raise self.exception from e
  File "/usr/lib64/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 24, in __await__
    raise self.exception from e
ConnectionError: 0 bytes read on a total of 1 expected bytes
mosquito commented 5 years ago

My quick test with rabbitmq:3.4.2-management passed fine. Could you check RPC example on your environment?

Darkless012 commented 5 years ago

RCP example passed fine in the Rabbit 3.3.5. Putting down minimal "non"-working example.

zarybnicky commented 5 years ago

While the RPC example works, the following minimal example of our usage doesn't. The problematic call is the declare_queue with x-message-ttl. When we change the type of that value to e.g. a string, we get the expected error aiormq.exceptions.ChannelPreconditionFailed: PRECONDITION_FAILED - invalid arg 'x-message-ttl' for queue 'queue' in vhost 'mundosalsadev': {unacceptable_type,longstr}. So declare_exchange works, the declare_queue request is sent successfully, but there's some problem in receiving the response...

import asyncio
import aio_pika

async def main():
    url = 'amqp://10.0.224.30:5672/mundosalsadev'
    conn = await aio_pika.connect_robust(url)
    chan = await conn.channel()
    await chan.set_qos(prefetch_count=1)

    exchange = await chan.declare_exchange(
        'test_incoming',
        type=aio_pika.ExchangeType.DIRECT,
        durable=True,
    )
    dlx_exchange = await chan.declare_exchange(
        'dlx_incoming',
        type=aio_pika.ExchangeType.DIRECT,
        durable=True,
    )
    queue = await chan.declare_queue(
        'queue',
        durable=True,
        arguments={
            'x-message-ttl': 60000,
            'x-dead-letter-exchange': 'dlx_incoming',
        },
    )
    await queue.bind(exchange)

    await exchange.publish(aio_pika.Message(b'{}'))

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                print(message.body)

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

The exception is the same:

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /usr/lib64/python3.6/asyncio/tasks.py:530> exception=ConnectionError('0 bytes read on a total of 1 expected bytes',)>
Traceback (most recent call last):
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 22, in __await__
    return (yield from self.task.__await__(*args, **kwargs))
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test_ttl.py", line 26, in main
    'x-dead-letter-exchange': 'dlx_incoming',
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aio_pika/robust_channel.py", line 127, in declare_queue
    arguments=arguments, timeout=timeout,
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aio_pika/channel.py", line 239, in declare_queue
    await queue.declare(timeout=timeout)
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aio_pika/queue.py", line 86, in declare
    ), timeout=timeout, loop=self.loop
  File "/usr/lib64/python3.6/asyncio/tasks.py", line 339, in wait_for
    return (yield from fut)
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/channel.py", line 505, in queue_declare
    arguments=arguments,
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 161, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 24, in __await__
    raise self.exception from e
  File "/usr/lib64/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 24, in __await__
    raise self.exception from e
  File "/usr/lib64/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/var/lib/mundoscaldis/mundoscaldis-monoprocessing/.venv/lib/python3.6/site-packages/aiormq/base.py", line 24, in __await__
    raise self.exception from e
ConnectionError: 0 bytes read on a total of 1 expected bytes
zarybnicky commented 5 years ago

I've actually encountered the same problem when trying to connect to a 3.5.0-management RabbitMQ in Docker, but there the problematic call was actually connect_robust when connecting to a non-existent vhost.

mosquito commented 5 years ago

@zarybnicky ConnectionError: 0 bytes read on a total of 1 expected bytes means server close connection and a reader-task was cancelled while reading header of the amqp frame. This exception will be logged every unexpected connection closing case.

@zarybnicky I change your example and it's forks fine with Rabbitmq 3.4+

import asyncio
import aio_pika

async def main():
    conn = await aio_pika.connect_robust()
    chan = await conn.channel()
    await chan.set_qos(prefetch_count=1)

    exchange = await chan.declare_exchange(
        'test_incoming',
        type=aio_pika.ExchangeType.DIRECT,
        durable=True,
    )
    dlx_exchange = await chan.declare_exchange(
        'dlx_incoming',
        type=aio_pika.ExchangeType.DIRECT,
        durable=True,
    )
    queue = await chan.declare_queue(
        'queue',
        durable=True,
        arguments={
            # CHANGES HERE        ↓    😀    
            'x-message-ttl': 600000,
            'x-dead-letter-exchange': dlx_exchange.name,
        },
    )
    await queue.bind(exchange)

    await exchange.publish(aio_pika.Message(b'{}'))

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                print(message.body)

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

When rabbitmq got short uint for argument then connection was closed from server side but when you pass long integer value it will works fine (e.g. 65536).

Queue arguments should be encoded as a "Field Table" (it's like dict inside amqp protocol). This is a part of the amqp protocol and the table will be packed as compact as possible, it means integers will be packed as integers and strings will be packed as strings. My conjecture is rabbitmq server expects only long integers not short integers, when you trying to pass short integer value the server considers the connection was corrupted.

I trying to avoid this behaviour.

zarybnicky commented 5 years ago

I tried to change the argument to a string before and got a aiormq.exceptions.ChannelPreconditionFailed: PRECONDITION_FAILED - invalid arg 'x-message-ttl' for queue 'queue' in vhost 'mundosalsadev': {unacceptable_type,longstr} in response. That's an expected exception I'd say, as the documentation says that "The argument can be of AMQP 0-9-1 type short-short-int, short-int, long-int, or long-long-int." (https://www.rabbitmq.com/ttl.html).

Maybe the behavior changed between 3.3 and 3.4?

mosquito commented 5 years ago

Maybe the behavior changed between 3.3 and 3.4?

I guess yes.

zarybnicky commented 5 years ago

Well, I must've been doing something wrong because now it just works... Just before downgrading to 4.* again I thought I'd try again, and it worked without a problem...

zarybnicky commented 5 years ago

Alright, I think I've gotten to the root of the problem. Despite the test program I've posted earlier, our application still crashed. On a hunch, I tried to vary the value of x-message-ttl, because it is 600000 in the test but 60000 in the application. Lo and behold, it works for values bigger than 65535 but fails with asyncio.streams.IncompleteReadError exception for smaller values.

The documentation says x-message-ttl can be short-short-int, short-int, long-int, or long-long-int, so it seems to me that this is a bug in the implementation of pamqp. AFAIK, the user cannot specify if the argument is supposed to be encoded as a int or a uint, so the need to rely on pamqp's table_integer to choose the correct type.

zarybnicky commented 5 years ago

I tried passing bytes encoded directly via pamqp.encode, but they were re-encoded as I'd expected. So I don't know how would a user of aio_pika or aiormq specify the type of an arguments value.

gmr commented 5 years ago

The main issue here is a contradiction in the AMQP spec.

RabbitMQ started supporting the full grammar of the AMQP spec for field-tables as of RabbitMQ 3.6. pamqp implements the full grammar.

However the grammar definition is contradicted by a single paragraph which describes field tables as being much more limited in scope than the grammar does.

I've crafted an email to the RabbitMQ users list soliciting feedback. https://groups.google.com/forum/#!topic/rabbitmq-users/_oRFbRQ6l-Y

gmr commented 5 years ago

One thing you could do is monkey-patch pamqp in your code to support older rabbitmq:

In your code:

import pamqp.encode

def table_integer(value):
    """Determines the best type of numeric type to encode value as, preferring
    the smallest data size first.

    :param int value: Value to encode
    :rtype: bytes
    :raises: TypeError

    """
    # Send the appropriately sized data value
    if 0 <= value <= 255:
        return b'b' + pamqp.encode.octet(value)
    elif -32768 <= value <= 32767:
        return b's' + pamqp.encode.short_int(value)
    elif -2147483648 <= value <= 2147483647:
        return b'I' + pamqp.encode.long_int(value)
    elif -9223372036854775808 <= value <= 9223372036854775807:
        return b'l' + pamqp.encode.long_long_int(value)
    raise TypeError('Unsupported numeric value: {}'.format(value))

pamqp.encode.table_integer = table_integer

I'm inclined to keep support for the full grammar of the spec in pamqp. And I'm considering how I could give users an option for RabbitMQ versions < 3.6 by explicitly registering for it.

gmr commented 5 years ago

After posting that suggestion it occurred to me I could build a similar functionality in to it. pamqp v2.3.0 has a new function, pamqp.encode.support_deprecated_rabbitmq() which will limit the encoding types available when building field-tables.

mosquito commented 5 years ago

Is this solution will workable when multiple connections with different server versions will be established?

gmr commented 5 years ago

Should be, it just limits the field table integer types to be signed integers instead of trying to support unsigned integers to optimize for frame size. It should be 100% compatible with all RabbitMQ versions, but only required for versions < 3.6.

zarybnicky commented 5 years ago

@gmr Thank you for the investigation. The fact that the error is on the side of RabbitMQ didn't even occur to me.

Although I did consider patching table_integer, we've worked around it in the meantime in a rather silly way, instead of message timeout of 60s it was enough to increase it to 66s (which is greater than 65535ms which was encoded correctly). I'll try bumping pamqp and get back to you.

zarybnicky commented 5 years ago

Using the pamqp BC switch works well, thanks!

mosquito commented 4 years ago

@Alveona probably it's because declared arguments and arguments of existent queue are different. I telepathically found out it, cause you didn't provide any details 😀.