gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

Uncaght exception in rabbitpy.IO.io class / channel0 when closing connection without stoping consumers #129

Open cmlara opened 3 years ago

cmlara commented 3 years ago

Using version 2.0.1

Scenario: Create a single rabbitpy.connection() and pass it to a thread which will use the connection to create a new channel and consumer.

In the main thread call connection.close() to close the connection.

As one would expect connections begin breaking down at this point and spawned threads throw various exceptions depending on what they were doing at the time, these can be caught in a try block and handled as one would normally do.

The daemon IO thread however while this is ongoing attempts to process a message and triggers an exception. Since the is in its own thread it can not be caught with a try() block anywhere in code.

I'll note in case it is relevant that the message channel0 is attempting to process when this happens appears routinely be a "504): CHANNEL_ERROR - expected 'channel.open'" caused by two attempts to close a single channel.

Sample code can be made to duplicate this by using the code provided in the Multi-threaded Use Notes with the minimal modifications of removing the producer to keep the receiver idle and after the consumer_thread.start() dd a short sleep followed by a connection.close()

It appears this can be avoided by tracking all the queue's globaly in the main thread and calling stop_consuming() on each before the connection.close() call

cmlara@dewey:~/TestCode$ python3 src/TestFullThreadCrash.py
Exception in thread 0x7f4e38853310-io:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 411, in run
    self._loop.run()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 207, in run
    self._poll()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 269, in _poll
    self._read()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 283, in _read
    self._data.read_callback(self._data.fd.recv(MAX_READ))
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 460, in on_read
    self._channels[0][0].on_frame(value[1])
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/channel0.py", line 108, in on_frame
    self._connection.close()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/connection.py", line 212, in close
    raise exceptions.ConnectionClosed()
rabbitpy.exceptions.ConnectionClosed: The connection is closed

Debug log:

cmlara@dewey:~/TestException$ python3 TestFullThreadCrashsimple.py
DEBUG:rabbitpy.base:Connection setting state to 'Opening'
DEBUG:rabbitpy.base:IO setting state to 'Opening'
DEBUG:rabbitpy.events:Waiting for 3 seconds on event: Socket Connected
DEBUG:rabbitpy.io:Connecting to ('127.0.0.1', 5672)
DEBUG:rabbitpy.base:IO setting state to 'Open'
DEBUG:rabbitpy.io:Socket connected
DEBUG:rabbitpy.io:Returning PollPoller
DEBUG:rabbitpy.connection:Adding channel 0 to io
DEBUG:rabbitpy.base:Channel0 setting state to 'Opening'
DEBUG:rabbitpy.base:Writing frame: ProtocolHeader
DEBUG:rabbitpy.channel0:Received frame: 'Connection.Start'
DEBUG:rabbitpy.channel0:Server supports publisher_confirms: True
DEBUG:rabbitpy.channel0:Server supports exchange_exchange_bindings: True
DEBUG:rabbitpy.channel0:Server supports basic.nack: True
DEBUG:rabbitpy.channel0:Server supports consumer_cancel_notify: True
DEBUG:rabbitpy.channel0:Server supports connection.blocked: True
DEBUG:rabbitpy.channel0:Server supports consumer_priorities: True
DEBUG:rabbitpy.channel0:Server supports authentication_failure_close: True
DEBUG:rabbitpy.channel0:Server supports per_consumer_qos: True
DEBUG:rabbitpy.channel0:Server supports direct_reply_to: True
DEBUG:rabbitpy.channel0:Server cluster_name: b'rabbit@dewey'
DEBUG:rabbitpy.channel0:Server copyright: b'Copyright (c) 2007-2019 Pivotal Software, Inc.'
DEBUG:rabbitpy.channel0:Server information: b'Licensed under the MPL 1.1. Website: https://rabbitmq.com'
DEBUG:rabbitpy.channel0:Server platform: b'Erlang/OTP 22.2.7'
DEBUG:rabbitpy.channel0:Server product: b'RabbitMQ'
DEBUG:rabbitpy.channel0:Server version: b'3.8.2'
DEBUG:rabbitpy.base:Writing frame: Connection.StartOk
DEBUG:rabbitpy.channel0:Received frame: 'Connection.Tune'
DEBUG:rabbitpy.channel0:Heartbeat interval (server/client): 60/60
DEBUG:rabbitpy.base:Writing frame: Connection.TuneOk
DEBUG:rabbitpy.base:Writing frame: Connection.Open
DEBUG:rabbitpy.channel0:Received frame: 'Connection.OpenOk'
DEBUG:rabbitpy.channel0:Connection opened
DEBUG:rabbitpy.base:Channel0 setting state to 'Open'
DEBUG:rabbitpy.heartbeat:Heartbeat started, ensuring data is written at least every 30.00 seconds
DEBUG:rabbitpy.base:Connection setting state to 'Open'
DEBUG:rabbitpy.connection:Adding channel 1 to io
DEBUG:rabbitpy.base:Channel setting state to 'Opening'
DEBUG:rabbitpy.base:Writing frame: Channel.Open
DEBUG:rabbitpy.base:Waiting on <class 'pamqp.specification.Channel.OpenOk'> frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Channel.OpenOk object at 0x7f560c898fd0>
DEBUG:rabbitpy.base:Channel setting state to 'Open'
DEBUG:rabbitpy.channel:Channel #1 open
DEBUG:rabbitpy.base:Issuing RPC to RabbitMQ: <pamqp.specification.Exchange.Declare object at 0x7f560c899190>
DEBUG:rabbitpy.base:Sending 'Exchange.Declare'
DEBUG:rabbitpy.base:Writing frame: Exchange.Declare
DEBUG:rabbitpy.base:Waiting on 'Exchange.DeclareOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Exchange.DeclareOk object at 0x7f560c8c0070>
DEBUG:rabbitpy.amqp_queue:Declaring Queue threading_queue, durable=False, passive=False, exclusive=False, auto_delete=False, arguments={}
DEBUG:rabbitpy.base:Issuing RPC to RabbitMQ: <pamqp.specification.Queue.Declare object at 0x7f560c8b9220>
DEBUG:rabbitpy.base:Sending 'Queue.Declare'
DEBUG:rabbitpy.base:Writing frame: Queue.Declare
DEBUG:rabbitpy.base:Waiting on 'Queue.DeclareOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Queue.DeclareOk object at 0x7f560c8b8ac0>
DEBUG:rabbitpy.base:Issuing RPC to RabbitMQ: <pamqp.specification.Queue.Bind object at 0x7f560c8c1ef0>
DEBUG:rabbitpy.base:Sending 'Queue.Bind'
DEBUG:rabbitpy.base:Writing frame: Queue.Bind
DEBUG:rabbitpy.base:Waiting on 'Queue.BindOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Queue.BindOk object at 0x7f560c8c30a0>
DEBUG:rabbitpy.base:Channel setting state to 'Closing'
DEBUG:rabbitpy.base:Channel 1 close invoked while Closing
DEBUG:rabbitpy.base:Channel 1 Waiting for a valid response for Channel.Close
DEBUG:rabbitpy.base:Sending 'Channel.Close'
DEBUG:rabbitpy.base:Writing frame: Channel.Close
DEBUG:rabbitpy.base:Waiting on 'Channel.CloseOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Channel.CloseOk object at 0x7f560c8c3130>
DEBUG:rabbitpy.base:Channel setting state to 'Closed'
DEBUG:rabbitpy.base:Channel #1 closed
DEBUG:rabbitpy.connection:Adding channel 2 to io
DEBUG:rabbitpy.base:Channel setting state to 'Opening'
DEBUG:rabbitpy.base:Writing frame: Channel.Open
DEBUG:rabbitpy.base:Waiting on <class 'pamqp.specification.Channel.OpenOk'> frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Channel.OpenOk object at 0x7f560c8c3250>
DEBUG:rabbitpy.base:Channel setting state to 'Open'
DEBUG:rabbitpy.channel:Channel #2 open
DEBUG:rabbitpy.base:Sending 'Basic.Qos'
DEBUG:rabbitpy.base:Writing frame: Basic.Qos
DEBUG:rabbitpy.base:Waiting on 'Basic.QosOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Basic.QosOk object at 0x7f560c8c3250>
DEBUG:rabbitpy.base:Sending 'Basic.Consume'
DEBUG:rabbitpy.base:Writing frame: Basic.Consume
DEBUG:rabbitpy.base:Waiting on 'Basic.ConsumeOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Basic.ConsumeOk object at 0x7f560c8c3640>
DEBUG:rabbitpy.base:Waiting on <class 'pamqp.specification.Basic.Deliver'> frame(s)

(starting to shutdown the connection)

DEBUG:rabbitpy.base:Connection setting state to 'Closing'
DEBUG:rabbitpy.base:Channel setting state to 'Closing'
DEBUG:rabbitpy.base:Interrupting the wait on frame
DEBUG:rabbitpy.channel:Channel 2 will nack messages for rabbitpy.2.140007554430384
DEBUG:rabbitpy.base:Channel 2 close invoked while Closing
DEBUG:rabbitpy.channel:Cancelling consumer while 'Closing' ('Closing')
DEBUG:rabbitpy.base:Sending 'Basic.Cancel'
DEBUG:rabbitpy.base:Writing frame: Basic.Cancel
DEBUG:rabbitpy.base:Waiting on 'Basic.CancelOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Basic.CancelOk object at 0x7f560c898be0>
DEBUG:rabbitpy.base:Read frame while shutting down: <pamqp.specification.Basic.CancelOk object at 0x7f560c898be0>
DEBUG:rabbitpy.base:Channel 2 Waiting for a valid response for Channel.Close
DEBUG:rabbitpy.base:Sending 'Channel.Close'
DEBUG:rabbitpy.base:Writing frame: Channel.Close
DEBUG:rabbitpy.base:Waiting on 'Channel.CloseOk' frame(s)
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Channel.CloseOk object at 0x7f560c898bb0>
DEBUG:rabbitpy.channel:Cancelling consumer while 'Closing' ('Closing')
DEBUG:rabbitpy.channel:Checking for RPC request: <pamqp.specification.Channel.CloseOk object at 0x7f560c898bb0>
DEBUG:rabbitpy.base:Sending 'Basic.Cancel'
DEBUG:rabbitpy.base:Channel setting state to 'Closed'
DEBUG:rabbitpy.base:Writing frame: Basic.Cancel
DEBUG:rabbitpy.base:Channel #2 closed
DEBUG:rabbitpy.base:Channel0 setting state to 'Closing'
DEBUG:rabbitpy.base:Sending 'Connection.Close'
DEBUG:rabbitpy.base:Writing frame: Connection.Close
DEBUG:rabbitpy.base:Waiting on 'Connection.CloseOk' frame(s)
DEBUG:rabbitpy.base:Waiting on 'Basic.CancelOk' frame(s)
DEBUG:rabbitpy.channel:Waited on frame, got None
DEBUG:rabbitpy.base:No need to interrupt wait
DEBUG:rabbitpy.channel:Channel 2 close invoked when already closed
DEBUG:rabbitpy.channel0:Received frame: 'Connection.Close'
WARNING:rabbitpy.channel0:RabbitMQ closed the connection (504): CHANNEL_ERROR - expected 'channel.open'
DEBUG:rabbitpy.base:Channel0 setting state to 'Closed'
Exception in thread 0x7f560d9319d0-io:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 411, in run
    self._loop.run()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 207, in run
    self._poll()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 269, in _poll
    self._read()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 283, in _read
    self._data.read_callback(self._data.fd.recv(MAX_READ))
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/io.py", line 460, in on_read
    self._channels[0][0].on_frame(value[1])
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/channel0.py", line 108, in on_frame
    self._connection.close()
  File "/home/cmlara/.local/lib/python3.8/site-packages/rabbitpy/connection.py", line 212, in close
    raise exceptions.ConnectionClosed()
rabbitpy.exceptions.ConnectionClosed: The connection is closed
DEBUG:rabbitpy.base:IO setting state to 'Closing'
DEBUG:rabbitpy.base:IO setting state to 'Closed'
DEBUG:rabbitpy.connection:Setting to closed
DEBUG:rabbitpy.base:Connection setting state to 'Closed'
(Note: there is another ConnectionClosed exception that triggers on the following thread.join() attempt using the simple test codes but it is both expected and catchable)
twodayslate commented 3 years ago

This may work for you. join the thread then close the connection