gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

Can I stop consuming gracefully? #38

Closed cenkalti closed 9 years ago

cenkalti commented 10 years ago

I am using following code to consume messages from a queue:

for message in queue:
    message.pprint(true)
    message.ack()
logger.info("worker has ended")  # cannot drop here

Is there a way to stop consuming messages (in another thread or a signal handler)?

What I want to do is to stop consuming messages when my program gets SIGUSR1 signal from OS, finish current task (if there is one), ACK last message then exit from program.

cenkalti commented 10 years ago

I tried to issue a Basic.Cancel but it did not work. The channel does not exit from _consume_message method.

gmr commented 10 years ago

hmm so, if you're not waiting on a message, you could check for a threading.Event to be set and exit your loop:

for message in queue:
    message.pprint(true)
    message.ack()
    if event.is_set():
        break

Right now there is no clean way to interrupt waiting on a message. I'll take a look at how to best implement that for the next release.

cenkalti commented 10 years ago

Yes, I thought that but the loop may be waiting on a message. I am updating our task runner (https://github.com/cenkalti/kuyruk) to use rabbitpy instead of pika. If I accomplish this ~400 lines of code will be removed from Kuyruk. Most of the abstractions I made in Kuyruk (Connection, Channel, Consumer, Message) already present in new rabbitpy API so doing this will simplify the code.

I need this feature to gracefully stop a consuming worker. Thanks for your help.

gmr commented 9 years ago

Hey @cenkalti,

As I mentioned in #39, I really appreciate your pull-request and effort. I decided to implement this in a slightly different way that didn't require as many structural changes. Do you mind testing against master to see if this addresses your issue?

Given your example, here's how you would use it:

def run(self):
    with self.queue.consumer() as consumer:
        for message in consumer.next_message():
            self.process_message(message)

def handle_sigterm(self, signum, frame):
    self.warm_shutdown()

def warm_shutdown(self):
    """Exit after the last task is finished."""
    if self.queue:
        self.queue.stop_consuming()
gmr commented 9 years ago

Closing this, please re-open if it does not address your needs.

cenkalti commented 9 years ago

I have tested but it did not solve my problem. When consumer is canceled from another thread or a signal handler (while the consumer is already waiting on frames) the loop does not break. I have written a test addressing the issue: https://github.com/cenkalti/rabbitpy/commit/ea6a33e982c912db5815653833a77e1e71452ada

ghost commented 9 years ago

I have the same problem (python3.4, rabbitpy-0.25.0): How do you stop consuming whilst no messages are being received. The below example hangs on stop_consuming(). I can provide a stack trace of all three threads at the point of deadlock, if that helps.

import rabbitpy
from threading import Thread
from time import sleep

def consumerThread(queue):
    for message in queue:
        if message:
            print('Got msg %s' % message.body)
            #message.ack()
        else:
            print('Empty message')
    print('Finished')

with rabbitpy.Connection('amqp://container:p@localhost:5672/test') as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'container_q')

        t = Thread(target = consumerThread, args = (queue,))
        t.start()
        sleep(5)
        print('About to stop')
        queue.stop_consuming()
        print('Joining')
        t.join()
        print('End')
denis-sumin commented 9 years ago

+1 to iotic-labs-vtermanis. stop_consuming() call hangs. Python 2.7

denis-sumin commented 9 years ago

I've made some research about the problem; the following occurs under the hood.

I have two threads: master and rabbitmq consumer.

Consumer thread iterates over the queue iterator; while waiting for a new message, it is waiting for one of the following messages from RabbitMQ: [pamqp.specification.Deliver, pamqp.specification.CancelOk].

Then I call stop() function from the master thread on the consumer thread, and there I call stop_consuming(). In fact, specification.Basic.Cancel is sent from rabbitpy.channel._cancel_consumer, and then it waits to receive specification.Basic.CancelOk as the answer: self._wait_on_frame(specification.Basic.CancelOk).

This answer is immediately received, but is tackled by the consumer thread, which still waits for either Deliver or CancelOk frame. After it, consumer receives None message, as stated in the docs. At the same time, _wait_on_frame(CancelOk) called from _cancel_consumer runs in the infinite loop, because it continues to wait for CancelOk message (which will obviously never be delivered). This is why stop_consuming() hangs.

The easy solution of the problem is not to check CancelOk answer in rabbitpy.channel._cancel_consumer, but I don't know whether it is a correct solution.

Please, share your opinion; I want to use rabbitpy in my project, but currently I'm not able to stop the consumer correctly.

gmr commented 9 years ago

Thanks for digging in on this. I'll take a stab at fixing the issue later today.

gmr commented 9 years ago

I've implemented a few changes that address this fully and will push it today. I'm going to work through a few more edge cases such as remote cancellation prior to pushing the commit. Expect a 0.26 to drop soon.

gmr commented 9 years ago

I believe this is addressed, please reopen if you continue to run into this issue.

macfer commented 9 years ago

Surprisingly the problem reappears when queue iterator and some code that affects the queue are executed in separate threads. In the following example one queue receives messages from multiple exchanges and in the meantime it can be bound to new exchanges (and unbound as well).

All works perfectly until you try to stop consuming - the call has no effect. Tested with Python 2.7.8 and rabbitpy 0.26.2.

When you move register_source and unregister_source calls to the run method, stop_consuming works correctly, but this is of course not possible in the real code.

Please, take a look at this. I was looking for alternative solutions (separate thread and queue for each exchange, binding and unbinding queue in the iterator thread) but they are overcomplicated, resource-intensive or unreliable.

from threading import Lock, Thread
from time import sleep
from uuid import uuid1

import rabbitpy

class BusEventsSource(Thread):

    def __init__(self, address):
        super(BusEventsSource, self).__init__()

        self.events = []
        self.sources = []
        self.events_lock = Lock()
        self.sources_lock = Lock()

        self.connection = rabbitpy.Connection(address)
        self.channel = self.connection.channel()
        self.queue = rabbitpy.Queue(self.channel, "framework." + str(uuid1()), exclusive=True,
                                    auto_delete=True)
        self.queue.declare()

    def pop_events(self):
        with self.events_lock:
            event_list = self.events
            self.events = []

        return event_list

    def register_source(self, source):
        with self.sources_lock:
            if source in self.sources:
                raise ValueError("Source already registered.")

            self.sources.append(source)
            exchange = rabbitpy.Exchange(self.channel, source, "fanout", auto_delete=True)
            exchange.declare()
            self.queue.bind(source)

    def unregister_source(self, source):
        with self.sources_lock:
            if source not in self.sources:
                raise ValueError("Source not registered.")

            self.sources.remove(source)
            self.queue.unbind(source)

    def run(self):
        for message in self.queue.consume():
            message.ack()

            with self.events_lock:
                self.events.append((message.exchange, message.body))

        self.connection.close()
        print "exiting run method"

if __name__ == "__main__":
    BES_thread = BusEventsSource("amqp://192.168.0.12:5672/%2F")
    BES_thread.start()
    BES_thread.register_source("framework.test0")
    BES_thread.register_source("framework.test1")
    for _ in range(100):
        sleep(0.1)
        events = BES_thread.pop_events()
        print "number of events: {}".format(len(events))
        for e in events:
            print
            print "source: {}".format(e[0])
            print "content: {}".format(e[1])
            print "----------"
    BES_thread.unregister_source("framework.test0")
    BES_thread.unregister_source("framework.test1")
    sleep(1)
    print "trying to stop consuming"
    BES_thread.queue.stop_consuming()
    sleep(1)
    print "exiting testing code"
cenkalti commented 7 years ago

@gmr Is there any updates on this issue? What is the recommended way to stop queue iterator from another thread?

michael-stefanski commented 7 years ago

@gmr This is still an issue. What is the recommended, thread-safe way of stop_consuming a Queue if it's in another thread? thank you

dvarelae858 commented 7 months ago

the key is maintaining the execution control in the main loop event