gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

Infinite 100% CPU loop on connection teardown #103

Closed fake-name closed 7 years ago

fake-name commented 8 years ago

I'm stress-testing my reconnect logic, and I appear to have managed to get the interface wedged in the rabbitpy somewhere.

Basically, the steps were as followed:

I can get a remote traceback from the wedged thread using pystuck. The thread is doing something, as the traceback is changing, and it's consuming 100% CPU. Here are some of the sniffed tracebacks:

<Thread(Thread-3, started daemon 139660919564032)>
  File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
    connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
    self._disconnect()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
    self.interface.close()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
    func()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
    super(Channel, self).close()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
    self.rpc(frame_value)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
    return self._wait_on_frame(frame_value.valid_responses)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 455, in _wait_on_frame
    self._read_queue.put(value)
  File "/usr/lib/python3.5/queue.py", line 145, in put
    self.not_empty.notify()
  File "/usr/lib/python3.5/threading.py", line 347, in notify
    return
<Thread(Thread-3, started daemon 139660919564032)>
  File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
    connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
    self._disconnect()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
    self.interface.close()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
    func()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
    super(Channel, self).close()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
    self.rpc(frame_value)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
    return self._wait_on_frame(frame_value.valid_responses)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 449, in _wait_on_frame
    value = self._read_from_queue()
  File "/usr/lib/python3.5/queue.py", line 145, in put
    self.not_empty.notify()
<Thread(Thread-3, started daemon 139660919564032)>
  File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
    connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
    self._disconnect()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
    self.interface.close()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
    func()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
    super(Channel, self).close()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
    self.rpc(frame_value)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
    return self._wait_on_frame(frame_value.valid_responses)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 449, in _wait_on_frame
    value = self._read_from_queue()
  File "/usr/lib/python3.5/queue.py", line 145, in put
    self.not_empty.notify()
  File "/usr/lib/python3.5/threading.py", line 241, in __exit__
    return self._lock.__exit__(*args)
<Thread(Thread-3, started daemon 139660919564032)>
  File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap
    self._bootstrap_inner()
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 526, in run_fetcher
    connection = ConnectorManager(config, runstate, active, tx_q, rx_q)
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 500, in monitor_loop
    self._disconnect()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 346, in _disconnect
    self.interface.close()
  File "/media/Storage/Scripts/ReadableWebProxy/LocalAmqpConnector/__init__.py", line 127, in close
    func()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/channel.py", line 141, in close
    super(Channel, self).close()
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 208, in close
    self.rpc(frame_value)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 227, in rpc
    return self._wait_on_frame(frame_value.valid_responses)
  File "/media/Storage/Scripts/ReadableWebProxy/rabbitpy/base.py", line 455, in _wait_on_frame
    self._read_queue.put(value)

Note that this is with a local copy of rabbitpy in my project's directory, because I wanted to be able to poke around and see if I could debug the issues (related to https://github.com/gmr/rabbitpy/issues/101).

fake-name commented 8 years ago

I just managed to replicate the lock-up with 100% cpu again. I think this might be at least marginally repeatable.

gmr commented 7 years ago

Looks like it's waiting on a Channel.CloseOk frame , I'll try and reproduce.

gmr commented 7 years ago

Thanks for the example, I am able to replicate using your example and am cleaning that up.

gmr commented 7 years ago

What I'm seeing with some cleaned up behavior for cross-thread and object synchronization for remote connection closing:

Item:  bytearray(b'test?')
DEBUG:rabbitpy.base:Writing frame: Basic.Ack
DEBUG:rabbitpy.base:Waiting on <class 'pamqp.specification.Basic.Deliver'> frame(s)
DEBUG:rabbitpy.channel0:Received frame: 'Connection.Close'
WARNING:rabbitpy.channel0:RabbitMQ closed the connection (320): b'CONNECTION_FORCED - Closed via management plugin'
DEBUG:rabbitpy.base:Channel0 setting state to 'Closed'
DEBUG:rabbitpy.connection:State: 'Open'
DEBUG:rabbitpy.base:Connection setting state to 'Closing'
DEBUG:rabbitpy.base:IO setting state to 'Closing'
DEBUG:rabbitpy.events:Event is already set: Socket Closed
DEBUG:rabbitpy.base:IO setting state to 'Closed'
DEBUG:rabbitpy.base:Connection setting state to 'Closed'
DEBUG:rabbitpy.io:Exiting due to closed socket
DEBUG:rabbitpy.io:Exiting IOLoop.run
DEBUG:rabbitpy.io:Exiting IO.run
Putting message
DEBUG:rabbitpy.base:Interrupting the wait on frame
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "example.py", line 60, in fill_queue
    rabbitpy.Message(self.channel, body_value="test?").publish(exchange="test_resp_enchange.e", routing_key="test_resp_queue")
  File "/Users/gmr/Source/rabbitpy/rabbitpy/message.py", line 284, in publish
    self.channel.write_frames(frames)
  File "/Users/gmr/Source/rabbitpy/rabbitpy/base.py", line 261, in write_frames
    if self._can_write():
  File "/Users/gmr/Source/rabbitpy/rabbitpy/base.py", line 283, in _can_write
    raise exceptions.ConnectionNotOpen()
rabbitpy.exceptions.ConnectionNotOpen: The connection is not open

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "example.py", line 51, in get_rx
    for item in self.in_q:
  File "/Users/gmr/Source/rabbitpy/rabbitpy/amqp_queue.py", line 195, in consume
    message = self.channel._consume_message()
  File "/Users/gmr/Source/rabbitpy/rabbitpy/channel.py", line 311, in _consume_message
    frame_value = self._wait_on_frame([spec.Basic.Deliver])
  File "/Users/gmr/Source/rabbitpy/rabbitpy/base.py", line 467, in _wait_on_frame
    self._check_for_exceptions()
  File "/Users/gmr/Source/rabbitpy/rabbitpy/base.py", line 309, in _check_for_exceptions
    raise exception
rabbitpy.exceptions.AMQPConnectionForced: b'CONNECTION_FORCED - Closed via management plugin'

Traceback (most recent call last):
  File "example.py", line 87, in <module>
    test()
  File "example.py", line 81, in test
    tester.interrupt()
  File "example.py", line 64, in interrupt
    self.in_q.stop_consuming()
  File "/Users/gmr/Source/rabbitpy/rabbitpy/amqp_queue.py", line 324, in stop_consuming
    raise exceptions.NotConsumingError()
rabbitpy.exceptions.NotConsumingError: No active consumer to cancel
fake-name commented 7 years ago

I see that periodically on teardown too, though something in my logic changes has fixed the wedging..

At this point, I've kind of papered over the issue with my "kill the connection with prejudice" solution, and then just wrapped the entire mess in retry logic. It's not great, but it's a workaround.


Sorry I wasn't able to provide a minimal example. Everything in my project is kind of messily smeared across a number of files. I really should refactor at some point, before I go fully insane.

gmr commented 7 years ago

I'm going to release 1.0 shortly -- given the age of the ticket, I don't expect you to test this right away. Do you want to test this before I close the ticket?

fake-name commented 7 years ago

I should be able to merge it into my branch this weekend, but I'm not sure if I can really replicate without unwrapping a lot of my additions.

If you're satisfied it's fixed, go ahead and close. I'll just comment if I see anything.

gmr commented 7 years ago

Ok, thanks