gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
243 stars 58 forks source link

Uncatchable exception #66

Closed kirgene closed 8 years ago

kirgene commented 9 years ago

I have a processing loop with rabbitpy calls inside try-except block:

try:
    ...
    message = queue.get()
    <processing_block>
    tx = rabbitpy.Tx(message.channel)
    tx.select()
    pub_message.publish('my-exchange', 'my-key')
    message.ack()
    tx.commit()
    ...
except (rabbitpy.exceptions.ActionException, rabbitpy.exceptions.ConnectionResetException,
                    rabbitpy.exceptions.ConnectionException) as e:
                print "Error."

Recently I noticed that during execution in <processing_block> some exception rises that is not catched by try-except.

Exception in thread 0x3480ef0-io:
Traceback (most recent call last):
  File "C:\Python34\lib\threading.py", line 920, in _bootstrap_inner
    self.run()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 348, in run
    self._loop.run()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 190, in run
    self._poll()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 235, in _poll
    self._read()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 249, in _read
    self._data.read_callback(self._data.fd.recv(MAX_READ))
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 386, in on_read
    self._channels[0][0].on_frame(value[1])
  File "C:\Python34\lib\site-packages\rabbitpy\channel0.py", line 114, in on_frame
    self.write_frame(heartbeat.Heartbeat())
  File "C:\Python34\lib\site-packages\rabbitpy\base.py", line 231, in write_frame
    self._check_for_exceptions()
  File "C:\Python34\lib\site-packages\rabbitpy\base.py", line 285, in _check_for_exceptions
    raise exception
rabbitpy.exceptions.ConnectionResetException: Connection was reset at socket level

This makes execution hang in tx.select() method later.

polaar commented 9 years ago

I was just about to create an issue that might be caused by the same thing. I've noticed that when I try to use Control-C to abort, rabbitpy seems to hang (preventing me from catching the KeyboardInterrupt). Also when the connection is closed by the server (and probably other cases as well where connection gets closed), it seems to hang without an exception. When I turn on debug logging, the last message says "Waiting for interrupt to clear" (in both cases). The problem seems to occur since version 0.26.1, no problem in 0.25.0

gmr commented 9 years ago

The exception is being handled at the connection level, are you using the context manager method of connecting?

with rabbitpy.Connection() as conn:
    with conn.channel() as channel:
        try: ...

or are you doing:

conn = rabbitpy.Connection()
channel = rabbitpy.Channel()
try: ...
kirgene commented 9 years ago

I'm using try-except outside of connection level:

try:
   with rabbitpy.Connection() as conn:
       with conn.channel() as channel:
           ...
           message = queue.get()
           ....
gmr commented 9 years ago

Sorry for the delay in a response. I tested this against master and I am seeing the following behavior:

#!/usr/bin/env python
import rabbitpy
import logging

from rabbitpy import exceptions

logging.basicConfig(level=logging.DEBUG)

try:
    with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f?heartbeat=15') as conn:
        with conn.channel() as channel:
            queue = rabbitpy.Queue(channel, 'example')

            # Exit on CTRL-C
            try:
                # Consume the message
                for message in queue.consume():
                    message.pprint()
                    message.ack()
            except KeyboardInterrupt:
                print('Exited consumer')
except rabbitpy.exceptions.ConnectionResetException as error:
    print('Error: %r' % error)

Output

(rabbitpy-2.7)gmr-mbp:rabbitpy gmr$ python consumer.py 
DEBUG:rabbitpy.base:Connection setting state to 'Opening'
DEBUG:rabbitpy.base:IO setting state to 'Opening'
DEBUG:rabbitpy.events:Waiting for 1 seconds on event: Socket Connected
DEBUG:rabbitpy.io:Connecting to ('127.0.0.1', 5672)
DEBUG:rabbitpy.base:IO setting state to 'Open'
DEBUG:rabbitpy.io:Socket connected
DEBUG:rabbitpy.io:Returning KQueuePoller
DEBUG:rabbitpy.connection:Adding channel 0 to io
DEBUG:rabbitpy.base:Channel0 setting state to 'Opening'
DEBUG:rabbitpy.base:Writing frame: ProtocolHeader
DEBUG:rabbitpy.channel0:Received frame: 'Connection.Start'
DEBUG:rabbitpy.channel0:Server information: 'Licensed under the MPL.  See http://www.rabbitmq.com/'
DEBUG:rabbitpy.channel0:Server product: 'RabbitMQ'
DEBUG:rabbitpy.channel0:Server copyright: 'Copyright (C) 2007-2014 GoPivotal, Inc.'
DEBUG:rabbitpy.channel0:Server supports exchange_exchange_bindings: True
DEBUG:rabbitpy.channel0:Server supports connection.blocked: True
DEBUG:rabbitpy.channel0:Server supports authentication_failure_close: True
DEBUG:rabbitpy.channel0:Server supports basic.nack: True
DEBUG:rabbitpy.channel0:Server supports per_consumer_qos: True
DEBUG:rabbitpy.channel0:Server supports consumer_priorities: True
DEBUG:rabbitpy.channel0:Server supports consumer_cancel_notify: True
DEBUG:rabbitpy.channel0:Server supports publisher_confirms: True
DEBUG:rabbitpy.channel0:Server cluster_name: 'rabbit@gmr-mbp'
DEBUG:rabbitpy.channel0:Server platform: 'Erlang/OTP'
DEBUG:rabbitpy.channel0:Server version: '3.5.1'
DEBUG:rabbitpy.base:Writing frame: Connection.StartOk
DEBUG:rabbitpy.channel0:Received frame: 'Connection.Tune'
DEBUG:rabbitpy.channel0:Tuning, client: 15
DEBUG:rabbitpy.base:Writing frame: Connection.TuneOk
DEBUG:rabbitpy.base:Writing frame: Connection.Open
DEBUG:rabbitpy.channel0:Received frame: 'Connection.OpenOk'
DEBUG:rabbitpy.channel0:Connection opened
DEBUG:rabbitpy.base:Channel0 setting state to 'Open'
DEBUG:rabbitpy.heartbeat:Started a heartbeat timer that will fire in 15 sec
DEBUG:rabbitpy.connection:Adding channel 1 to io
DEBUG:rabbitpy.base:Channel setting state to 'Opening'
DEBUG:rabbitpy.base:Writing frame: Channel.Open
DEBUG:rabbitpy.base:Waiting on <class 'pamqp.specification.OpenOk'> frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Channel.OpenOk object at 0x100f55830>
DEBUG:rabbitpy.base:Channel setting state to 'Open'
DEBUG:rabbitpy.channel:Channel #1 open
DEBUG:rabbitpy.base:Sending 'Basic.Consume'
DEBUG:rabbitpy.base:Writing frame: Basic.Consume
DEBUG:rabbitpy.base:Waiting on 'Basic.ConsumeOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Basic.ConsumeOk object at 0x100f55830>
DEBUG:rabbitpy.base:Waiting on <class 'pamqp.specification.Deliver'> frame(s)
DEBUG:rabbitpy.io:Poll errors: [0L]
DEBUG:rabbitpy.io:Exiting due to closed socket
DEBUG:rabbitpy.io:Exiting IOLoop.run
DEBUG:rabbitpy.io:Exiting IO.run
DEBUG:rabbitpy.base:No need to interrupt wait
DEBUG:rabbitpy.channel:Cancelling consumer
DEBUG:rabbitpy.base:Sending 'Basic.Cancel'
DEBUG:rabbitpy.channel:Exiting due to exception: ConnectionResetException()
DEBUG:rabbitpy.base:Channel setting state to 'Closed'
DEBUG:rabbitpy.base:Connection setting state to 'Closed'
Error: ConnectionResetException()

The waiting for the interrupt to clear issue should be resolved in master and will be released in 0.27.0

shilpabhave commented 9 years ago

gmr - we are using rabbit.py for a long running worker process and it stops consuming because of this uncatchable exception. We have to keep restarting the worker atleast once a day. We are using version 0.26.1. Is this fixed in 0.26.2?

gmr commented 8 years ago

Has long been fixed, sorry for the lingering open ticket.