gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

fix a bug in Consumer.cancel when it is called from a different thread #45

Closed cenkalti closed 9 years ago

cenkalti commented 9 years ago

Fixes the issue that I mention at: https://github.com/cenkalti/rabbitpy/commit/ea6a33e982c912db5815653833a77e1e71452ada#commitcomment-8033548

coveralls commented 9 years ago

Coverage Status

Coverage increased (+0.03%) when pulling 1dd18f04f536c7e37cf3c4edfdafa1ee61a1870f on cenkalti:consumer_cancel_thread_fix into 29788e6ad35e1277fcacaa08269a321ee8eeea0e on gmr:master.

gmr commented 9 years ago

This is not the right way to address the issue. I've added a new test that demonstrates that you can stop consuming outside of the thread. I don't see an issue here.


class PublishAndConsumeIteratorStopTest(unittest.TestCase):

    def setUp(self):
        self.connection = rabbitpy.Connection()
        self.channel = self.connection.channel()
        self.exchange = rabbitpy.TopicExchange(self.channel, 'test-pacit')
        self.exchange.declare()
        self.queue = rabbitpy.Queue(self.channel, 'pacit-queue')
        self.queue.declare()
        self.queue.bind(self.exchange, 'test.#')

        self.app_id = 'PublishAndConsumeIteratorTest'
        self.message_body = 'ABC1234567890'
        self.message_type = 'test'

        for iteration in range(0, 10):
            self.msg = rabbitpy.Message(self.channel,
                                        self.message_body,
                                        {'app_id': self.app_id,
                                         'message_id': str(uuid.uuid4()),
                                         'timestamp': int(time.time()),
                                         'message_type': self.message_type})
            self.msg.publish(self.exchange,
                             'test.publish.consume {0}'.format(iteration))

    def stop_consumer(self):
        self.queue.stop_consuming()

    def tearDown(self):
        self.queue.delete()
        self.exchange.delete()

    def test_iterator_exits_on_stop(self):
        LOGGER.info('Starting stop timer')
        timer = threading.Timer(2, self.stop_consumer)
        timer.daemon = True
        timer.start()
        for msg in self.queue:
            if not msg:
                LOGGER.info('Message is empty')
                break
            msg.ack()
        LOGGER.info('Exited iterator')
        self.assertFalse(self.queue.consuming)