ryanhiebert / hirefire

A Python lib to integrate with the HireFire service -- The Heroku Proccess Manager.
https://hirefire.readthedocs.io/
Other
34 stars 21 forks source link

Celery doesn't account for running and reserved tasks #8

Closed ryanhiebert closed 8 years ago

ryanhiebert commented 9 years ago

This is a similar problem to #3 .

I'm using the Redis backend for Celery. I tried my hand at a solution in https://github.com/aspiredu/hirefire/commit/3ae625cc527330357a66df8d8c5e2701213d5af7, but it only works for small numbers of dynos, workers, and tasks. Even when it does work, it's painfully slow.

The issue is that active, reserved, and scheduled tasks don't show up in the queue. Missing the scheduled ones is fine, but missing active and reserved means that my queues, which are running a lot of long-running tasks, get shut down well before stuff is done processing, which causes the dynos to flap as they are put back in the queue, then taken out, then when the process repeats.

ryanhiebert commented 9 years ago

Unfortunately I can confirm that #9 has not solved this issue, even when I'm using py-ampq or librabbitmq. It is indeed an orthogonal issue.

ryanhiebert commented 9 years ago

An IRC conversation with @ask gave me two ideas to try. Both are somewhat involved for something as light as HireFire, but we've gotta do what we've gotta do.

  1. Use Flower to get data on the running worker queues.
  2. Use heartbeats from worker to get up-to-date data.
  3. (from me) use inspect() (that's how flower gets it's info) for all queues/procs/workers at once and somehow re-use that data for the life of the request to the HireFire info URL.

2 requires some upstream code-changes to Celery (the heartbeats don't currently include scheduled and reserved counts), and also requires a persistent monitor to receive those events.

I'm honestly not sure which path to take first. I'll just have to take one and run with it.

touilleMan commented 9 years ago

@ryanhiebert did you find any solution for this trouble ?

ryanhiebert commented 9 years ago

No solution yet. It's been somewhat back-burnered in my current workload, but trying to use Flower to get a better picture of the running queues seems like the route I'm likely to try first. Any and all help would be welcome!

joshowen commented 9 years ago

Any updates on this issue?

ryanhiebert commented 9 years ago

No, sorry. Haven't been able to work on it any more yet.

joshowen commented 9 years ago

I wrote a basic implementation that seems to work.

import itertools

from celery import Celery
from django.conf import settings
from gevent.pool import Group

from hirefire.procs.celery import CeleryProc

def _inspect(method):
    app = Celery('app', broker=settings.BROKER_URL)
    inspect_result = getattr(app.control.inspect(), method)()
    app.close()
    return inspect_result

class UpdatedCeleryProc(CeleryProc):
    def quantity(self):
        """
        Returns the aggregated number of tasks of the proc queues.
        """
        if hasattr(self.channel, '_size'):
            # Redis
            return sum(self.channel._size(queue) for queue in self.queues)
        # AMQP (Include both libs, since we have both installed)
        from librabbitmq import ChannelError as LCE
        from amqp.exceptions import ChannelError as ACE
        count = 0
        for queue in self.queues:
            try:
                queue = self.channel.queue_declare(queue, passive=True)
            except (LCE, ACE):
                # The requested queue has not been created yet
                pass
            else:
                count += queue.message_count
        active, reserved = Group().map(_inspect, ('active', 'reserved'))
        num_active = sum(1 for v in itertools.chain.from_iterable(active.itervalues()) if not v.get('acknowledged', False) and v.get('delivery_info', {}).get('exchange') in self.queues) if active else 0
        num_reserved = sum(1 for v in itertools.chain.from_iterable(reserved.itervalues()) if not v.get('acknowledged', False) and v.get('delivery_info', {}).get('exchange') in self.queues) if reserved else 0
        return count + num_active + num_reserved
ryanhiebert commented 8 years ago

@joshowen : That can work OK, for a small number of queues. Unfortunately, I have around 20 queues, and the combined time of running this made for timeouts in Heroku. I've got to find something faster.


https://github.com/celery/celery/issues/2839

This issue with Celery also makes this issue worse. Tasks that are running are immediately shut down, instead of being given the time that Heroku usually gives to let the dynos shut down gracefully. Having this fixed wouldn't be sufficient by itself, but it would make the problem less of a deal for short tasks.

joshowen commented 8 years ago

@ryanhiebert Even using gevent?

ryanhiebert commented 8 years ago

@joshowen: Actually, that's not what I had done before. I'm going to have to dig more into it, and see if it works for me.

I'm not currently using gevent.

joshowen commented 8 years ago

With gevent I was able to run the _inspect calls in parallel and got around the heroku timeouts

ryanhiebert commented 8 years ago

Let me try it. I still do have quite a few queues (not just workers), with different procs, so even with gevent it means that I'll have to run ~20 of these to get all the stats for HireFire. Worth a try, though.

syphar commented 8 years ago

@joshowen thanks for your example code, we just ran into the same issue.

I build a similar solution, still slow, but less calls: the app.control.inspect() calls always return the data for every queue, not only one. We have just extended HireFireMiddleware to do these calls once (not only for active and reserved, but also for scheduled) and then the subsequent CeleryProcs will get the data from their queues out of it. So queue size does not have any effect on the calls, perhaps only on the time celery needs to call all the workers.

(cc @ryanhiebert )

syphar commented 8 years ago

will try to bake this into a PR, I think this is a common problem if you want to scale to 0 :)

ryanhiebert commented 8 years ago

@syphar: that sounds awesome. I'm looking forward to it.

ryanhiebert commented 8 years ago

@syphar: I'm working on this now for work, and if possible I'd like to see your approach. Any chance you could let me see your work (via very raw, perhaps broken PR, or perhaps a gist of the interesting code), so that I can take inspiration at least?

syphar commented 8 years ago

@ryanhiebert yes, sure. Sorry for the delay many things happening :)

middleware:

class HireFireMiddleware(OriginalHireFireMiddleware):
    """
    An extended HireFireMiddleware which includes counts worker-stats.

    we query for running, reserved or scheduled tasks and add them to the
    queued tasks per queue.

    The related procs have to have a method called ``quantity_with_running``
    which will get the inspection data, so it's queried only once per
    call from hirefire.
    """

    def dump_procs(self, procs):
        celery_inspect = {
            method: getattr(app.control.inspect(), method)()
            for method in ('active', 'reserved', 'scheduled')
        }

        data = [
            {
                'name': name,
                'quantity': proc.quantity_with_running(celery_inspect) or 'null',
            }
            for name, proc in procs.items()
        ]

        return json.dumps(data, cls=TimeAwareJSONEncoder, ensure_ascii=False)

    def info(self, request):
        payload = self.dump_procs(self.loaded_procs)
        return HttpResponse(payload, content_type='application/json')

procs.py

class UpdatedCeleryProc(CeleryProc):
    def quantity_with_running(self, celery_inspect):
        count = super(UpdatedCeleryProc, self).quantity()
        for func, tasks in celery_inspect.items():
            if not tasks:
                continue

            queue_tasks = [
                v
                for v in itertools.chain.from_iterable(tasks.values())
                if (
                    not v.get('acknowledged', False) and
                    v.get('delivery_info', {}).get('exchange') in self.queues
                )
            ]
            count += len(queue_tasks)

        return count

class LowPrioWorkerProc(UpdatedCeleryProc):
    name = 'celeryworkerlowprio'
    queues = ['lowprio']

this code

What was blocking me (when I had some time) was the question how to design this the best way to

ryanhiebert commented 8 years ago

Awesome. I'll work on figuring out those when I'm implementing it for myself. This is very, very, helpful. Thank you.

syphar commented 8 years ago

@ryanhiebert we can get this into a working PR together, if you have some nice ideas on the design :)

ryanhiebert commented 8 years ago

Would you be up for a pairing session, perhaps? You can hit me up on irc.freenode.net, I'm ryanhiebert, or you can email me ryan@ryanhiebert.com

ryanhiebert commented 8 years ago

If you're interested in pairing, I've set up a floobits workspace at https://floobits.com/ryanhiebert/hirefire.

ryanhiebert commented 8 years ago

I do have a good idea, and that's to send a cache dictionary into the procs, so that they can share state between themselves. I'm working up an implementation.

ryanhiebert commented 8 years ago

Here's my work-in-progress branch on the approach I came up with. Feel free to give me feedback. I'll open up a PR after I've done some more testing in my environment.

ryanhiebert commented 8 years ago

https://github.com/aspiredu/hirefire/tree/global-cache