fedora-infra / fedmsg

Federated Messaging with ZeroMQ
https://fedmsg.readthedocs.io/
GNU Lesser General Public License v2.1
170 stars 93 forks source link

Gracefully stopping consumers? #517

Open lcarva opened 6 years ago

lcarva commented 6 years ago

I have a consumer that takes about 20 minutes to process a received message. During this period the consumer should not be stopped. If I need to deploy a new version of the consumer, for example, things become challenging.

I'd like a way to signal fedmsg to stop consuming new messages; wait for consumers to finish processing current messages; then finally stop fedmsg. Ideally a configurable max wait timeout would control this.

Before I delve into the code base, does this approach make sense?

ralphbean commented 6 years ago

@lcarva, it makes sense to me. @mprahl has wanted something similar for MBS (a maintenance mode to drain the system), so you may want to talk to him as well.

I figure your patch will need to touch moksha.hub primarily but may have to reach into fedmsg too.

jeremycline commented 6 years ago

The way we've handled this in fedora-messaging is to register signal handlers. I'd recommend that approach. Handle the signal by setting a "stop" flag, and if that flag is already set (e.g. the signal got sent twice) stop immediately.

lcarva commented 6 years ago

@jeremycline, ah that's clever because I can do that on my consumer directly (I think) without having to mess with fedmsg/moksha. Thanks, I'll look into that.

lcarva commented 6 years ago

The way we've handled this in fedora-messaging is to register signal handlers. I'd recommend that approach. Handle the signal by setting a "stop" flag, and if that flag is already set (e.g. the signal got sent twice) stop immediately.

I think I'm getting closer to this. I've added a signal handler like this in my consumer:

class MyConsumer(fedmsg.consumers.FedmsgConsumer):
  def __init__(self, hub):
    ...
    signal.signal(signal.SIGTERM, self._on_sigterm)

    def _on_sigterm(self, signum, frame):
        log.info('Received signal %d', signum)
        if signum != signal.SIGTERM:
            log.warning('Signal is unexpected, doing nothing')
            return

        # Do not consume more messages from UMB
        self.hub.unsubscribe(self._consume)

        # Finish up what's already in queue
        while not self.incoming.empty():
            # TODO: Break if configured timeout expires
            time.sleep(1)

        # TODO: Ensure that there are no active queue items being processed - not sure what to check

        # Disconnect from hub and do some cleanup (Yikes! this is ugly)
        self.stop()
        self.hub.stop()
        from moksha.hub.reactor import reactor
        reactor.stop()

After calling reactor.stop() it seems that any process waits for any currently running threads to finish executing before quitting which is pretty close to what I want.

The only missing piece is checking for running threads before killing hub/reactor. The reason being that I'd like to publish a message at the end of my consume method. If I'm killing hub/reactor, no messages will be published. I'll keep digging to see if there's a way.