Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

aioamqp.exceptions.AmqpClosedConnection #164

Open ZeroTworu opened 6 years ago

ZeroTworu commented 6 years ago

RabbitMQ log:

=INFO REPORT==== 29-May-2018::11:23:59 ===
accepting AMQP connection <0.604.0> (127.0.0.1:46488 -> 127.0.0.1:5672)

=INFO REPORT==== 29-May-2018::11:23:59 ===
connection <0.604.0> (127.0.0.1:46488 -> 127.0.0.1:5672): user 'hanyuu' authenticated and granted access to vhost 'test'

=WARNING REPORT==== 29-May-2018::11:24:00 ===
closing AMQP connection <0.604.0> (127.0.0.1:46488 -> 127.0.0.1:5672, vhost: 'test', user: 'hanyuu'):
client unexpectedly closed TCP connection

=INFO REPORT==== 29-May-2018::11:24:18 ===
accepting AMQP connection <0.623.0> (127.0.0.1:46526 -> 127.0.0.1:5672)

=INFO REPORT==== 29-May-2018::11:24:18 ===
connection <0.623.0> (127.0.0.1:46526 -> 127.0.0.1:5672): user 'hanyuu' authenticated and granted access to vhost 'test'

=WARNING REPORT==== 29-May-2018::11:26:18 ===
closing AMQP connection <0.623.0> (127.0.0.1:46526 -> 127.0.0.1:5672, vhost: 'test', user: 'hanyuu'):
client unexpectedly closed TCP connection

Traceback:

Traceback (most recent call last):
  File "/home/hanyuu/projects/worker/main.py", line 79, in rabbit_handler
    await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
  File "/home/hanyuu/venvs/worker/lib/python3.6/site-packages/aioamqp/channel.py", line 720, in basic_client_ack
    yield from self._write_frame(frame, request)
  File "/home/hanyuu/venvs/worker/lib/python3.6/site-packages/aioamqp/channel.py", line 111, in _write_frame
    yield from self.protocol.ensure_open()
  File "/home/hanyuu/venvs/worker/lib/python3.6/site-packages/aioamqp/protocol.py", line 133, in ensure_open
    raise exceptions.AmqpClosedConnection()
aioamqp.exceptions.AmqpClosedConnection

Code:

async def rabbit_handler(channel: Channel, body: bytes, envelope: Envelope, properties: Properties, data_channel: Channel):
    try:
        message = json.loads(body.decode('utf-8'))
        res = await handle_message(message, data_channel)

        await data_channel.basic_publish(payload=json.dumps(res), exchange_name='', routing_key='data')
        await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
    except AmqpClosedConnection as e:
        log.error(e)
        exit(-1)

async def start_rabbit():
    transport, protocol = await aioamqp.from_url(config['worker']['rabbit']['url'])
    channel = await protocol.channel()
    _channel = await protocol.channel()
    await channel.queue_declare(queue_name=config['worker']['rabbit']['queue_name'])
    await _channel.queue_declare(queue_name='data')
    await channel.basic_consume(partial(rabbit_handler, data_channel=_channel), queue_name=config['worker']['rabbit']['queue_name'])

async def run():
    url = f'{config["worker"]["rabbit"]["url"]}'
    print(f'Started on {url}')
    await start_rabbit()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.run_forever()

Connection url: amqp://hanyuu:***@127.0.0.1:5672/test?heartbeat=0&connection_timeout=120000

serg666 commented 3 years ago

Hi, guys!

Any news about this issue?

I have the same problem:


import asyncio
import aioamqp

async def connect():
    transport, protocol = await aioamqp.from_url('amqp://gateway:qazwsx@10.2.0.153:5672/gateway')

    print("connected !")
    await asyncio.sleep(10)

    print("close connection")
    await protocol.close()
    transport.close()

asyncio.get_event_loop().run_until_complete(connect())

When I run this code, I have got the following:

python t2.py 
only PLAIN login_method is supported, falling back to AMQPLAIN
connected !
close connection
Traceback (most recent call last):
  File "t2.py", line 14, in <module>
    asyncio.get_event_loop().run_until_complete(connect())
  File "/usr/lib64/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "t2.py", line 11, in connect
    await protocol.close()
  File "/home/mamonov/env-notifier/lib64/python3.8/site-packages/aioamqp/protocol.py", line 160, in close
    await self.ensure_open()
  File "/home/mamonov/env-notifier/lib64/python3.8/site-packages/aioamqp/protocol.py", line 135, in ensure_open
    raise exceptions.AmqpClosedConnection()
aioamqp.exceptions.AmqpClosedConnection

Rabbitmq logs shows me the following:

=INFO REPORT==== 7-Aug-2020::23:12:10 ===
accepting AMQP connection <0.32190.3> (10.2.1.10:44634 -> 10.2.0.153:5672)

=WARNING REPORT==== 7-Aug-2020::23:12:14 ===
closing AMQP connection <0.32190.3> (10.2.1.10:44634 -> 10.2.0.153:5672):
connection_closed_abruptly

How can I fix the problem?

alex-67 commented 2 years ago

There is a bug in aioamqp that the code incorrectly interprets hearbeat with the value of 0. By default heartbeat is set to None on a client side but the server makes the client to use the heartbeat with a value of 0 by sending Connection.Tune method, so to workaround it you need to explicitly set heartbeat with some value except 0 and None. But do not set it in URL because the heartbeat parameter isn't read from that.

So you can use the code like this: self.transport, self.protocol = await aioamqp.from_url(url, heartbeat=60)

The problem has already been solved in this PR: https://github.com/Polyconseil/aioamqp/pull/209/commits/57d3996a7690f9534b3ea07bfa584d318b55082e