Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

RabbitMQ heartbeat issue ([Errno 104] Connection reset by peer) #96

Closed comzyh closed 8 years ago

comzyh commented 8 years ago

I'm using aioamqp 0.7 in python 3.5 envioronment.

About 180 seconds after I called basic_consume

await self.channel.basic_consume(queue_name='worker_heartbeat', callback=self.handle_heartbeat, no_ack=True)

there are error message in my terminal

[2016-05-07 18:39:39,461] asyncio:ERROR: Task exception was never retrieved
future: <Task finished coro=<disconnected() done, defined at /home/vagrant/Project/foo/bar/core/mq_connection.py:31> exception=AmqpClosedConnection()>
Traceback (most recent call last):
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/frame.py", line 413, in read_frame
    data = yield from self.reader.readexactly(7)
  File "/usr/lib/python3.5/asyncio/streams.py", line 659, in readexactly
    block = yield from self.read(n)
  File "/usr/lib/python3.5/asyncio/streams.py", line 617, in read
    yield from self._wait_for_data('read')
  File "/usr/lib/python3.5/asyncio/streams.py", line 451, in _wait_for_data
    yield from self._waiter
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 297, in _wakeup
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
    result = coro.send(None)
  File "/home/vagrant/Project/foo/bar/core/mq_connection.py", line 37, in disconnected
    raise exception
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 663, in _read_ready
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
    result = coro.send(None)
  File "/home/vagrant/Project/foo/bar/core/mq_connection.py", line 37, in disconnected
    raise exception
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 262, in run
    yield from self.dispatch_frame()
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 204, in dispatch_frame
    frame = yield from self.get_frame()
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 189, in get_frame
    yield from frame.read_frame()
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/frame.py", line 415, in read_frame
    raise exceptions.AmqpClosedConnection() from ex
aioamqp.exceptions.AmqpClosedConnection

At the same time I got error message in rabbitMQ log like:

=INFO REPORT==== 7-May-2016::18:36:39 ===
accepting AMQP connection <0.5469.0> (192.168.50.4:53416 -> 192.168.50.4:5672)

=ERROR REPORT==== 7-May-2016::18:39:39 ===
closing AMQP connection <0.5469.0> (192.168.50.4:53416 -> 192.168.50.4:5672):
Missed heartbeats from client, timeout: 60s

It seems that after 3 missing heartbeat, the rabbitMQ server close the connection.

And I notice that if nothing is published in the queue I consume, the connection will not be closed.

But in the rest part of my project, I use the same way to create channel, and consume the message, nothing seems to be wrong.

My question is:

please forgive my poor English.

comzyh commented 8 years ago

After comparing to pika, I think this is a bug of aioamqp.

comzyh commented 8 years ago

Here is the reproduce: https://gist.github.com/comzyh/0262f159f764a748a163f9f13b26578b

Run

python aioamqp_0.7_heartbeat_bug_reproduce.py sender

Then run in another terminal

python aioamqp_0.7_heartbeat_bug_reproduce.py receiver

you will see the error log from receiver about 30 seconds later

comzyh commented 8 years ago

If you run my test code like

python aioamqp_0.7_heartbeat_bug_reproduce.py pika_receiver

you may see

[2016-05-08 03:41:25,413] pika.heartbeat:DEBUG: Received heartbeat frame
[2016-05-08 03:41:26,505] pika.heartbeat:DEBUG: Received 155 heartbeat frames, sent 170
[2016-05-08 03:41:26,505] pika.heartbeat:DEBUG: Sending heartbeat frame
[2016-05-08 03:41:35,321] pika.heartbeat:DEBUG: Received heartbeat frame
[2016-05-08 03:41:36,507] pika.heartbeat:DEBUG: Received 156 heartbeat frames, sent 171
[2016-05-08 03:41:36,508] pika.heartbeat:DEBUG: Sending heartbeat frame
[2016-05-08 03:41:45,230] pika.heartbeat:DEBUG: Received heartbeat frame
[2016-05-08 03:41:46,512] pika.heartbeat:DEBUG: Received 157 heartbeat frames, sent 172
[2016-05-08 03:41:46,513] pika.heartbeat:DEBUG: Sending heartbeat frame

Please pay attention on the times above, like 03:41:25,413.

At least for pika or rabbitmq-server, the heart beat from server and heart beat from client are irrelevant/independent.

But after I read the sourcecode in /aioamqp/protocol.py

https://github.com/Polyconseil/aioamqp/blob/4feafba21d48e4b507c0bb2fee80c9f9c98e3ad5/aioamqp/protocol.py#L205-L207

I think you are to send heart beat to server when and only when the client receive a heartbeat from server (if user doesn't call the heartbeat method manually)

According to https://www.rabbitmq.com/heartbeats.html, I think maybe you misunderstand the protocol of heartbeat.

I was wondering that client should send heartbeat periodical, just like pika did:

https://github.com/pika/pika/blob/502aa0e6fdb57274aa1583138081eefc6b6e8f62/pika/heartbeat.py#L103 https://github.com/pika/pika/blob/502aa0e6fdb57274aa1583138081eefc6b6e8f62/pika/heartbeat.py#L159

How do you think?

comzyh commented 8 years ago

If someone is suffering from this issue, here is the code may help you get stable connection.

For aioamqp 7.0 only

# -*- coding: utf-8 -*-
import aioamqp
import asyncio

connection = None
protocol = None
__aioamqp_heartbeat_patch_timer = None

async def disconnected(exception):
    global connection, protocol
    global __aioamqp_heartbeat_patch_timer
    connection = None
    protocol = None
    __aioamqp_heartbeat_patch_timer.cancel()
    __aioamqp_heartbeat_patch_timer = None
    print(exception)

async def __aioamqp_heartbeat_patch():
    global protocol
    while True:
        print('sending heartbeat to rabbitmq server.')
        await protocol.heartbeat()
        await asyncio.sleep(protocol.server_heartbeat)

async def get_channel():
    global connection, protocol
    global __aioamqp_heartbeat_patch_timer
    if not connection or not protocol:
        try:
            connection, protocol = await aioamqp.connect(
                host='yourhost',
                on_error=disconnected,
            )
            __aioamqp_heartbeat_patch_timer = asyncio.ensure_future(__aioamqp_heartbeat_patch())
        except aioamqp.AmqpClosedConnection as e:
            await disconnected(e)
            raise
    channel = await protocol.channel()
    return channel
RemiCardona commented 8 years ago

Yes, the heartbeat handling is busted, I noticed this a while ago but didn't get around to fixing it. This is pretty high on the todo list. I hope to get the time to tackle that in the coming weeks, though I can't make any promises.

Without wanting to sound cliché, any patch would be most welcome.

Cheers

comzyh commented 8 years ago

Thanks for your reply, and I have already made a patch #97

Let me explain my suspicion as follow:

PS: Have you run my reproduce code? and can you reproduce that bug in your environment?

mpaolini commented 8 years ago

I tested the patch #97 and it works here. I also add a small enhancement to log a warning if the server does not reply to our own hearbeats

RemiCardona commented 8 years ago

Hi folks, I'm working on making tests pass with this branch but the feature seems to work with manual testing.

I took some time to decipher what the spec says about heartbeating and I ended up trashing the entire heartbeat code. Now it's completely transparent, the hearbeat() coroutine is still there for compatibility purposes, but I'm still thinking about trashing it completely.

Please give it a try and let me know if you find any issues with it.

Thanks again for your patience.

comzyh commented 8 years ago

@RemiCardona I have test your patch using my reproduce: https://gist.github.com/comzyh/0262f159f764a748a163f9f13b26578b It works !

RemiCardona commented 8 years ago

Sweet! Thanks for testing!

I'll rework a few things and push this PR to master.