cameronmaske / celery-once

Celery Once allows you to prevent multiple execution and queuing of celery tasks.
https://pypi.python.org/pypi/celery_once/
BSD 2-Clause "Simplified" License
659 stars 91 forks source link

Restrict which task can clear a lock #101

Closed molten-firescar96 closed 4 years ago

molten-firescar96 commented 5 years ago

Scenario: I have a set of tasks which I want to continuously run - a new instance gets sent to the queue when the last one finishes. I'm using celery_once to ensure that only one new instance gets sent to the queue at a time. This is done after tasks.

@task_postrun.connect
def task_postrun(**kwargs):
    task = kwargs['task']
    if issubclass(type(task), QueueOnce):
        task.apply_async()

Expected Behavior: Occasionally more than one copy of a task is in the queue eg. I temporarily want to run 10 copies of a process to get through a backlog. The number of copies should decay back to one as the copies I entered are handled.

Actual Behavior Each copy clears the task lock before starting and sets it after finishing. This maintains the number of copies eg. if I put in 10 copies, it stays at 10.

PR: This PR restricts which task can clear a lock to the task which the lock was taken out for. The lock id is attached to a task using the celery_once_lock_id kwarg. Do you see this as a feature or a bug?

cameronmaske commented 5 years ago

Hi @MTonomy-Nchinda-Nchinda

Sorry for being silent and only getting around to this PR.

Not to dismiss the work you've done but for my own understanding of the situation and workflow, could the same thing be achieved by using the arguments of a task to generate unique locks?

i.e.

@celery.task(base=QueueOnce)
def example_task(count):
    pass

example_task.delay(1)
example_task.delay(2) # <-- Two being processed

@task_postrun.connect
def task_postrun(**kwargs):
    task = kwargs['task']
    if issubclass(type(task), QueueOnce):
        # Note, need to somehow get the kwargs (i.e. 1, 2, etc) of the task, to repass them through. 
        task.apply_async(task_kwargs)
molten-firescar96 commented 5 years ago

My question is if for some reason 10 tasks of the same arguments (and same lock id) end up in the queue, maybe on a one off basis I need the task to run extra times. What should celery_once do with the tasks? I would expect the number to eventually drop to one. Instead celery_once maintains the 10 tasks in the queue. I could solve it without any changes to the library, but I'm wondering if other people might also be having the same problem.

molten-firescar96 commented 5 years ago

I tried out the code I wrote and it's broken, but if you agree with the issue I'm describing I can fix it.

molten-firescar96 commented 4 years ago

found a work around so we don't need this anymore

schinckel commented 3 years ago

@molten-firescar96 I've hit a similar issue - can you detail your workaround? My workaround was to use a second task that just re-queues the first task (basically a co-recursion situation).

molten-firescar96 commented 3 years ago

My workaround was to stop using celery-once. I experienced problems with a combination of celery-once and rabbitmq with tasks getting dropped completely or running on multiple machines at once, which suggested rabbitmq isn't designed for this use case.

The solution is a combination of a while loop to run through tasks in a queue, a try/except statement so failed tasks get skipped and rerun later, and a redis lock so only one machine at a time takes a task.