cameronmaske / celery-once

Celery Once allows you to prevent multiple execution and queuing of celery tasks.
https://pypi.python.org/pypi/celery_once/
BSD 2-Clause "Simplified" License
661 stars 90 forks source link

Task conflict detection at runtime #88

Closed molten-firescar96 closed 5 years ago

molten-firescar96 commented 5 years ago

One thing that confused me is that celery_once tries to detect conflicts and acquire locks before inputting a task into the queue and not when a task is pulled from the queue to run. Do you have opinions about also including a task existance check as part of the QueueOnce.__call__ method?

cameronmaske commented 5 years ago

@MTonomy-Nchinda-Nchinda A few reasons behind this!

molten-firescar96 commented 5 years ago

That makes sense. in my case i have a small cluster of nodes which execute tasks like converting video formats, and another disjoint set of nodes which submit requests. Requests are something like: celery.send_task('processing_cluster.process_video', ['wall-e.mp4', 'video'])

The set of nodes submitting requests don't have access to the task definitions, or import celery_once. 1) Would you recommend importing celery_once in the message producers, maybe if I create a stub, no-op, task definition and use QueueOnce as a base class I can call apply_async on it and the task will get routed to my message consumers? 2) Could celery_once check both when a task is queued and after a task is pulled from the queue that the running task is unique? With two different 🔒. 3) something else?

cameronmaske commented 5 years ago

@MTonomy-Nchinda-Nchinda I'm not too familiar with using send_task.

Maybe a simple but verbose solution is to have the tasks called, call QueueOnce tasks? i.e

@celery.task
def process_video(filename, label):
    # Entry point for remote task calls
    process_video_once.delay(filename, label)

@celery.task(base=QueueOnce, graceful=True)
def process_video_once(filename, label):
    # Process video code here
    ...

# On remote host 
celery.send_task('processing_cluster.process_video', ['wall-e.mp4', 'video'])

Alternatively, you could extend the logic that is apply_async to the send_task function (here is the relevant code) but that will only work if the remote hosts have access to the redis/locking backend.

molten-firescar96 commented 5 years ago

The remote hosts do not have access to redis. Your code sample looks pretty elegant so I'll use that. Thanks.