gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

rabbitpy hangs after rabbitmq restart #44

Closed DamCour closed 9 years ago

DamCour commented 9 years ago

Hi,

I've been trying to make rabbitpy restart consuming after a rabbitmq restart but there seem to be some problems:

A simple consumer:

while True:
    with rabbitpy.Connection(connection_parameters) as connection:
        with connection.channel() as channel:
            for message in rabbitpy.Queue(channel, 'queue_name'):
                    message.ack()
                    print(message.json())

After a rabbitmq server restart:

2014-10-22 11:50:09,487:0x1acfe90-io:Socket error: None
None
Exception in thread 0x1acfe90-io:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 317, in run
    self._loop.run()
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 164, in run
    self._poll()
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 200, in _poll
    self._data.error_callback(None)
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 328, in on_error
    args = [self._args['host'], self._args['port'], exception[1]]
TypeError: 'NoneType' object has no attribute '__getitem__'

And the script hangs... It looks like it is blocked in the channel close process.

The first workaround I found was to catch AMQPConnectionForced (because it seems to be the exception that is raised in rabbitpy when rabbitmq server restarts) and to mark the channel as closed manually.

while True:
    print 'yes'
    with rabbitpy.Connection(connection_parameters) as connection:
        with connection.channel() as channel:
            try:
                for message in rabbitpy.Queue(channel, 'heartbeat'):
                    message.ack()
                    print(message.json())
            except rabbitpy.exceptions.AMQPConnectionForced:
                print 'Catched AMQPConnectionForced, closing channel'
                channel._set_state(channel.CLOSED)

Actually, it works when the AMQPConnectionForced is raised, but sometimes it is not, so the script still hangs.

I'm not very good at python and thus I definitely could be wrong but I think io.py on_error function needs to be modified into something like this:

328,330c328,333
<         args = [self._args['host'], self._args['port'], exception[1]]
<         if self._channels[0][0].open:
<             self._exceptions.put(exceptions.ConnectionResetException(*args))

---
>         if exception:
>             args = [self._args['host'], self._args['port'], exception[1]]
>             if self._channels[0][0].open:
>                 self._exceptions.put(exceptions.ConnectionResetException(*args))
>             else:
>                 self._exceptions.put(exceptions.ConnectionException(*args))
332c335,336
<             self._exceptions.put(exceptions.ConnectionException(*args))

---
>             args = [self._args['host'], self._args['port'], None]
>             self._exceptions.put(exceptions.AMQPConnectionForced(*args))

Am I right or am I doing things bad ?

Thank you in advance for your responses.

Cheers,

Damien

gmr commented 9 years ago

I'm working through a few related exceptions and will have it updated soon.

gmr commented 9 years ago

Here's what I've been testing with to get things cleanly raising catchable exceptions on disconnect:

import logging
import time

import rabbitpy
from rabbitpy import exceptions

logging.basicConfig(level=logging.DEBUG)

def run():
    logging.info('Connecting')
    try:
        with rabbitpy.Connection() as connection:
            with connection.channel() as channel:
                queue = rabbitpy.Queue(channel, 'test')
                queue.declare()
                logging.info('Consuming')
                for message in queue:
                        message.ack()
                        message.pprint(False)
    except (exceptions.ConnectionResetException,
            exceptions.AMQPConnectionForced) as error:
        logging.error('Connection error: %s', error)

while True:
    run()
    logging.info('After run')
    time.sleep(5)
DamCour commented 9 years ago

Hello,

The fix seems to have solved my problem.

Thank you very much for that and for all the work you make on rabbitpy, I'm having a lot of fun playing with it.

Cheers,

Damien