gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
243 stars 58 forks source link

Implement cancellation of Consumer #39

Closed cenkalti closed 10 years ago

cenkalti commented 10 years ago

Hello @gmr I have added Consumer.cancel() method and have done some refactoring. I believe that I did not make any changes that breaks backward compatibility. In order to cancel a consuming Consumer I had to inject a new object (base.InternalCommand) into the channel's read queue when a Basic.CancelOk frame is received. It does not support cancellation of multiple concurrent consumers yet because CancelConsumer objects do not have any identifier for consumers (consumer_tag). However, it works perfectly for a single consumer. If you want a change about this, I can modify my pull request.

coveralls commented 10 years ago

Coverage Status

Coverage decreased (-0.42%) when pulling 410aa4e18f91a9320a3b7946485705b1c0e63dc5 on cenkalti:cancel_consumer into 39351e9459374b620d8ff36a17aa05d350b0158d on gmr:master.

coveralls commented 10 years ago

Coverage Status

Coverage decreased (-0.42%) when pulling 410aa4e18f91a9320a3b7946485705b1c0e63dc5 on cenkalti:cancel_consumer into 39351e9459374b620d8ff36a17aa05d350b0158d on gmr:master.

cenkalti commented 10 years ago

Hello @gmr. Did you have a chance to take a look?

cenkalti commented 10 years ago

This PR fixes #38.

gmr commented 10 years ago

I've not, sorry juggling quite a few things. I'll pull it down shortly and see where it leads :)

gmr commented 10 years ago

In testing, this breaks consumer behavior to no longer be blocking with a context manager. Looking to see if I can identify why.

coveralls commented 10 years ago

Coverage Status

Coverage increased (+0.8%) to 68.35% when pulling 89238b93ca44045d516344c29bec59f197133f00 on cenkalti:cancel_consumer into c1b14c1685aa1c74f600f3e894e91dcdedd0341f on gmr:master.

cenkalti commented 10 years ago

@gmr I have fixed the blocking consumer problem.

gmr commented 10 years ago

Ok, I'll review in the next day or two. I'm a bit hesitant to make API changes, but may be able to take what you wrote and massage it a bit to get it where it needs to be.

cenkalti commented 10 years ago

Great, I have squashed my commits into one for more clean history. Thanks!

coveralls commented 10 years ago

Coverage Status

Coverage increased (+0.77%) when pulling 643dc357b9c1fa62780cfa3b5d87cf43a13783d7 on cenkalti:cancel_consumer into 170e4d1b93d0b0a40ecff2cbc35fc6e306c25b36 on gmr:master.

cenkalti commented 10 years ago

This is how I use it:

def run(self):
    with self.queue.consumer() as consumer:
        self.consumer = consumer
        for message in consumer.next_message():
            self.process_message(message)

def handle_sigterm(self, signum, frame):
    self.warm_shutdown()

def warm_shutdown(self):
    """Exit after the last task is finished."""
    if self.consumer:
        self.consumer.cancel()
gmr commented 10 years ago

Hey Cenk,

I really appreciate your pull-request and effort. I decided to implement this in a slightly different way that didn't require as many structural changes. I'm going to close out this PR and we can continue the discussion in #38