python-trio / trio-amqp

Asynchronous messaging for snake people
BSD 3-Clause "New" or "Revised" License
10 stars 7 forks source link

AmqpProtocol did not close connection properly #14

Open haolian9 opened 3 years ago

haolian9 commented 3 years ago

reproduce: (just copied from examples/send.py)

env:

import anyio
import async_amqp

async def send():
    async with async_amqp.connect_amqp() as protocol:
        channel = await protocol.channel()

        await channel.queue_declare(queue_name="hello")

        await channel.basic_publish(
            payload=b"Hello World!", exchange_name="", routing_key="hello"
        )

        print(" [x] Sent 'Hello World!'")

# anyio.run(send)
# i do run this script on trio
trio.run(send)

logs at the rabbitmq side:

rabbitmq    | 2021-08-24 08:23:09.194281+00:00 [info] <0.2972.0> accepting AMQP connection <0.2972.0> (172.22.0.1:45794 -> 172.22.0.2:5672)
rabbitmq    | 2021-08-24 08:23:09.196500+00:00 [info] <0.2972.0> connection <0.2972.0> (172.22.0.1:45794 -> 172.22.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
rabbitmq    | 2021-08-24 08:23:09.199222+00:00 [warn] <0.2972.0> closing AMQP connection <0.2972.0> (172.22.0.1:45794 -> 172.22.0.2:5672, vhost: '/', user: 'guest'):
rabbitmq    | 2021-08-24 08:23:09.199222+00:00 [warn] <0.2972.0> client unexpectedly closed TCP connection

rabbitmq complains client unexpectedly closed TCP connection.

as comparison, i tried pika, and here is rabbitmq's logs

rabbitmq    | 2021-08-24 08:50:27.299260+00:00 [info] <0.3927.0> accepting AMQP connection <0.3927.0> (172.22.0.1:45828 -> 172.22.0.2:5672)
rabbitmq    | 2021-08-24 08:50:27.302410+00:00 [info] <0.3927.0> connection <0.3927.0> (172.22.0.1:45828 -> 172.22.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
rabbitmq    | 2021-08-24 08:50:27.310122+00:00 [info] <0.3927.0> closing AMQP connection <0.3927.0> (172.22.0.1:45828 -> 172.22.0.2:5672, vhost: '/', user: 'guest')
haolian9 commented 3 years ago

A client connection can be closed cleanly or abnormally. In the former case the client closes AMQP 0-9-1 (or 1.0, or STOMP, or MQTT) connection gracefully using a dedicated library function (method). In the latter case the client closes TCP connection or TCP connection fails. RabbitMQ will log both cases.

according to the docs of rabbitmq, it seems AmqpProtocol just simply close the underlying socket connection, without sending a proper close signal(?) to rabbitmq first.

Abruptly closed connections can be harmless.

althrough, i think implemeting the "normal close" is still good to have.

smurfix commented 3 years ago

I agree and it should close cleanly (when not crashing). I'll look into it (though it'll be a couple weeks before I can get to it). Pull requests welcome. ;-)

haolian9 commented 3 years ago

according to the amqp 0-9-1 reference, there is a connection.close() command which corresponding to pamqp.commands.Connection.Close().

after dived into the code, and i think it could be caused by the following code:

Protocol.close():

    # see: https://github.com/python-trio/trio-amqp/blob/dd805fc93e668b4ae094609d4afefa4561e64ae4/async_amqp/protocol.py#L228
    await self.connection_closed.set()

    if not got_close:
        # ...

        # see: https://github.com/python-trio/trio-amqp/blob/dd805fc93e668b4ae094609d4afefa4561e64ae4/async_amqp/protocol.py#L248
        await self.wait_closed()
        # which simply call `self.connection_closed.wait()`

since connection_closed is always set before waiting, the later wait() does not work as expected.

Mudiwa66 commented 8 months ago

I agree and it should close cleanly (when not crashing). I'll look into it (though it'll be a couple weeks before I can get to it). Pull requests welcome. ;-)

Was this fixed? Or can I work on it?