mher / flower

Real-time monitor and web admin for Celery distributed task queue
https://flower.readthedocs.io
Other
6.44k stars 1.08k forks source link

Ability to use flower's internals outside of Flower? #129

Closed Karmak23 closed 11 years ago

Karmak23 commented 11 years ago

Sorry to file a bug for that, it's more an advise question or a feature request.

I would like to use Flower's great-looking iter_tasks() method to something like:

for task in TaskModel.iter_tasks(state='RECEIVED'):
    if task['args'].startswith('MY_OLD_ARGUMENT_OF_OLD_FOREVER_RETRYING_TASK'):
      revoke(task['id'], terminate=True)

It's undoable manually in production conditions via flower's web interface because I have 5K+ tasks in this particular situation; revoking them manually with the mouse is not an option ;-)

Purging queues is not an option too, on this live system.

I tried to use celery's inspect, but it has no .received()method or equivalent, and .reserved() doesn't get me the faulty tasks even with a big PREFETCH setting.

I'm working in a Django-enabled IPython notebook directly plugged into the live celery cluster (it runs on a worker, and has the exact same environment, started from ./manage.py shell_ipynb instead of ./manage.py celery flower).

I've tried to manually setup the Flower app, then eventually using only the flower.state.State class to get events.state, but I only acheived to wonder if the simplest way of doing what I want could probably be to fork flower, in order to create a new management command that runs the job I want.

But being able to run this .iter_tasks() interactively would open a new set of inspecting possibilities on my failing tasks.

Do you have any advise or tip that could help me get the .iter_tasks() functionnality runable independantly? Having the background threads running should not be a problem at all.

best regards,

mher commented 11 years ago

Have you seen the flower's api? https://github.com/mher/flower/blob/master/flower/urls.py#L61. You should be able to get the list of tasks with /api/tasks, filter them and then revoke with /api/task/revoke/task-id

Karmak23 commented 11 years ago

Oh, nice.

I will try to begin with that to cleanup my old tasks, as it's quite urgent.

This still adds the overhead on the HTTP+JSON on both sides, though. To simply get the result as a python list in a python program i will obviously have to get it via requests and do a json.loads on it. Given that flower is on the same machine as the ipython shell, this seems a little too much for the CPU. But better than nothing.

I will try to see on my free time if I can reach something like (in a very schematic sumup):


from django.conf import settings
from flower.app import StandaloneFlower
from flower.python_api import Api

# Starts the threads, etc. but not the tornado HTTP part.
my_flower = python_api.Api(Flower(…<settings.BROKER_URL somewhere>…))

for task in my_flower.list_tasks(…):
    …

This seem like it could have been done in the past with the cam infrastructure of celery, but it seemed a bit outdated for what I read on the internet about it. I like the Flower implementation, for what I read of its source code.

Anyway, thanks !