Open Jacobh2 opened 5 years ago
Ping @mosquito, would you have any ideas on this issue? I have tried again to connect to a rabbitmq queue using topic
type and still get the problem where the channel gets closed for some strange reason.
@Jacobh2 Could you create traffic dump, and send me it?
I changed the code a bit and was able to get a very interesting error finally:
consumer_1 | Traceback (most recent call last):
consumer_1 | File "consumer2.py", line 58, in main
consumer_1 | timeout=10
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/common.py", line 122, in wrap
consumer_1 | return await func(self, *args, **kwargs)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/channel.py", line 318, in declare_queue
consumer_1 | await queue.declare(timeout)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/common.py", line 122, in wrap
consumer_1 | return await func(self, *args, **kwargs)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/queue.py", line 92, in declare
consumer_1 | exclusive=self.exclusive
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/channel.py", line 698, in queue_declare
consumer_1 | replies)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/channel.py", line 1139, in _rpc
consumer_1 | self._send_method(method_frame)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/channel.py", line 1150, in _send_method
consumer_1 | self.connection._send_method(self.channel_number, method_frame, content)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/connection.py", line 1569, in _send_method
consumer_1 | self._send_frame(frame.Method(channel_number, method_frame))
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/connection.py", line 1550, in _send_frame
consumer_1 | marshaled_frame = frame_value.marshal()
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/frame.py", line 74, in marshal
consumer_1 | pieces = self.method.encode()
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/spec.py", line 1011, in encode
consumer_1 | data.encode_table(pieces, self.arguments)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/data.py", line 85, in encode_table
consumer_1 | tablesize += encode_value(pieces, value)
consumer_1 | File "/usr/local/lib/python3.7/site-packages/aio_pika/pika/data.py", line 122, in encode_value
consumer_1 | pieces.append(struct.pack('>ci', b'I', value))
consumer_1 | struct.error: 'i' format requires -2147483648 <= number <= 2147483647
consumer_1 | DEBUG:aio_pika.connection:Closing AMQP connection
It looks like the problem is that this lib isn't following the RabbitMQ's AMQP protocol correctly. The doc says The argument can be of AMQP type short-short-int, short-int, long-int, or long-long-int.
And this error shows that the maximum value allowed is 2147483647, which is a 32 bit signed number, and not a 64 bit which is also allowed.
At first, when I had this issue 2 weeks ago, I also tested another lib, which I found had the exact same issue: https://github.com/Polyconseil/aioamqp/issues/180
Actually versions newer then 0.10 are broken. You can find my issues on pika/pika.
Are you referring to this?
If the case is that this lib don't want to upgrade to keep in sync with the parent lib that it is based on, this lib will fall longer behind other bugfixes, for example this one that I found now. It seems like the authors of pika really wants this and it to be in sync and work together which sounds amazing!
Either way, with the current version of Pika that is being used, this lib is not following the AMQP protocol.
I'm also experiencing this issues, probably in connection with #197 .
My configuration: aio-pika 6.4.1, and so nothing to do with the pika
library now that aio-pika doesn't depend on it anymore.
The fault appeas when using rabbitmq 3.5.7 (as provided by Ubuntu 16.04) but not 3.6.10 (as provided by Ubuntu 18.04). MWE:
import logging
from . import BaseTestCase
log = logging.getLogger(__name__)
class TestCase(BaseTestCase):
async def test_declare_queue(self):
connection = await self.create_connection()
channel = await connection.channel()
# Declaring queue
await channel.declare_queue(
'task_queue',
exclusive=True,
arguments={'x-message-ttl': 60000}
)
Producing the following traceback:
_________________________ TestCase.test_declare_queue __________________________
self = <TaskWrapper: <Task cancelled coro=<rpc() done, defined at /home/travis/virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/channel.py:96>>>
async def __inner(self):
try:
> return await self.task
E concurrent.futures._base.CancelledError
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/base.py:25: CancelledError
The above exception was the direct cause of the following exception:
self = <tests.test_amqp.TestCase testMethod=test_declare_queue>
async def test_declare_queue(self):
connection = await self.create_connection()
channel = await connection.channel()
# Declaring queue
await channel.declare_queue(
'task_queue',
exclusive=True,
> arguments={'x-message-ttl': 60000}
)
tests/test_amqp.py:18:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
aio_pika/channel.py:261: in declare_queue
await queue.declare(timeout=timeout)
aio_pika/queue.py:87: in declare
), timeout=timeout
/opt/python/3.6.7/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/channel.py:636: in queue_declare
arguments=arguments,
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/base.py:168: in wrap
return await self.create_task(func(self, *args, **kwargs))
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/base.py:27: in __inner
raise self.exception from e
/opt/python/3.6.7/lib/python3.6/asyncio/tasks.py:537: in _wrap_awaitable
return (yield from awaitable.__await__())
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/base.py:27: in __inner
raise self.exception from e
/opt/python/3.6.7/lib/python3.6/asyncio/tasks.py:537: in _wrap_awaitable
return (yield from awaitable.__await__())
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/base.py:27: in __inner
raise self.exception from e
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/connection.py:377: in __reader
weight, channel, frame = await self.__receive_frame()
../../../virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/connection.py:329: in __receive_frame
frame_header = await self.reader.readexactly(1)
/opt/python/3.6.7/lib/python3.6/asyncio/streams.py:674: in readexactly
yield from self._wait_for_data('readexactly')
/opt/python/3.6.7/lib/python3.6/asyncio/streams.py:464: in _wait_for_data
yield from self._waiter
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_SelectorSocketTransport closed fd=13>
def _read_ready(self):
if self._conn_lost:
return
try:
> data = self._sock.recv(self.max_size)
E ConnectionResetError: [Errno 104] Connection reset by peer
/opt/python/3.6.7/lib/python3.6/asyncio/selector_events.py:725: ConnectionResetError
----------------------------- Captured stderr call -----------------------------
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:aio_pika.connection:Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
DEBUG:aio_pika.connection:Channel created: <Channel "None#Not initialized channel">
DEBUG:aio_pika.queue:Declaring queue: <Queue(task_queue): auto_delete=False, durable=None, exclusive=True, arguments={'x-message-ttl': 60000}>
DEBUG:aiormq.connection:Reader task exited because:
Traceback (most recent call last):
File "/home/travis/virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/connection.py", line 377, in __reader
weight, channel, frame = await self.__receive_frame()
File "/home/travis/virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/connection.py", line 329, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/opt/python/3.6.7/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/opt/python/3.6.7/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/opt/python/3.6.7/lib/python3.6/asyncio/selector_events.py", line 725, in _read_ready
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:aio_pika.connection:Closing AMQP connection None
------------------------------ Captured log call -------------------------------
selector_events.py 65 DEBUG Using selector: EpollSelector
connection.py 184 DEBUG Creating AMQP channel for connection: <Connection: "amqp://guest:******@localhost">
connection.py 191 DEBUG Channel created: <Channel "None#Not initialized channel">
queue.py 81 DEBUG Declaring queue: <Queue(task_queue): auto_delete=False, durable=None, exclusive=True, arguments={'x-message-ttl': 60000}>
connection.py 412 DEBUG Reader task exited because:
Traceback (most recent call last):
File "/home/travis/virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/connection.py", line 377, in __reader
weight, channel, frame = await self.__receive_frame()
File "/home/travis/virtualenv/python3.6.7/lib/python3.6/site-packages/aiormq/connection.py", line 329, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/opt/python/3.6.7/lib/python3.6/asyncio/streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "/opt/python/3.6.7/lib/python3.6/asyncio/streams.py", line 464, in _wait_for_data
yield from self._waiter
File "/opt/python/3.6.7/lib/python3.6/asyncio/selector_events.py", line 725, in _read_ready
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
connection.py 105 DEBUG Closing AMQP connection None
=========================== 1 failed in 0.45 seconds ===========================
The full travis log can be found here: https://travis-ci.com/muhrin/aio-pika/jobs/268539430
Is there any way to currently to get such an x-message-ttl
configuration working without monkey patching aiormq?
Just checked with 'x-message-ttl': 66000
and it works fine (https://travis-ci.com/muhrin/aio-pika/jobs/268557081) so indeed the same failure mode as #197 but in this case all the way up to RabbitMQ 3.5.
Hey!
I'm unable to create/connect to a queue when providing
arguments
to thechannel.declare_queue
method.If I don't provide any
arguments
, the connection works just fine and I can produce and consume messages. But if I create the queue beforehand with the arguments:and then try to connect without the
arguments
, then I get an error as expected:I then add the needed arguments to my consumer, but it immediately gives me
INFO 2018-11-27 12:52:17,269 aio_pika.pika.channel close 422 : Channel.close(0, Normal Shutdown)
in the logsConsumer method:
which I start with
What am I doing wrong that makes the channel close immediately?
Log: