cameronmaske / celery-once

Celery Once allows you to prevent multiple execution and queuing of celery tasks.
https://pypi.python.org/pypi/celery_once/
BSD 2-Clause "Simplified" License
659 stars 91 forks source link

Controlling lock behaviour on task failure #110

Open SHxKM opened 4 years ago

SHxKM commented 4 years ago

Hi, thanks for the superb library.

I have a long running task, it takes a single argument user_id.

@task(name="some_name", base=QueueOnce)
def task_a(user_id):
    ...

I did a system restart while this task was running. This made this task fail. I use celery-once for this task because it's scheduled to run every X minutes, celery-once is working superbly in preventing task_a() from running at the same time for the same user_id.

But I can also invoke task_a() manually. When I tried to do so with the user_id when the task failed, I wouldn't get any kind of warning about task_a() being AlreadyQueued or anything - the task just wasn't sent to my workers. It's like celery stopped communicating for this task_a() when invoked with that specific user_id.

After one hour had passed, which is my global lock timeout for celery-once, I could invoke the task manually.

I find this a bit odd. I realize that the task wasn't invoked at all because there was a lock acquired on it, but shouldn't I have seen an error, warning, or any other indication that this task is being intentionally ignored?

SHxKM commented 4 years ago

Hmm, this is starting to seriously limit my ability to manually retry certain tasks. Suppose scheduled task_a() fails for any reason, even if I now manually invoke task_a() (within the 60 minute lock-period), absolutely nothing will happen, not even the AlreadyQueued warning.

Is there a solution to this that I'm not aware of? perhaps manually deleting the key in redis..? that seems a bit too low-level.

cameronmaske commented 4 years ago

@SHxKM Sorry for the slow reply.

Currently, if you invoke a task locally, i.e calltask_a(user_id='fred1') it should not check against celery once's lock and does not prevent the function executing. Only delay and apply_async check against the lock (apply, does not).

This could explain why you are not seeing AlreadyQueued exceptions and the task is not being sent to the worker.

Just to double-check, are you calling the task function directly or with delay/apply_async? If it's the latter, the failure might not be clearing the lock, when the worker restarts.

SHxKM commented 4 years ago

@cameronmaske Please don't apologize. This library has spared me a logistical hellstew, so thank you!

Just to double-check, are you calling the task function directly or with delay/apply_async?

Yes. Basically task_a(user="fred1") is sent periodically via apply_async(queue="regular"), but sometimes, especially when it fails (due to 3rd-party API unavailability, for example), I need to re-inoke it (also with apply_async) manually and ASAP on a different queue: apply_async(queue="important"). Not that I think the queue matters, but doesn't hurt to be specific.

This could explain why you are not seeing AlreadyQueued exceptions and the task is not being sent to the worker.

...the failure might not be clearing the lock, when the worker restarts.

This makes sense. Any way to mitigate the above?

cameronmaske commented 4 years ago

Ok. If an exception occurs during a task, causing it to fail (and not trigger a retry) celery_once should clear the lock. The library hooks into after_return function, which is called regardless of success or failure, just when a task finishes.

If the task is interrupted due to a hard worker restart, I'm not sure how celery handles it. We currently don't test against that scenario, so maybe that prevents the lock from clearing.

But, that doesn't seem to line up with the scenario you have described.

Are you using the package sentry-sdk (there is a known issue #105)? What version of python + celery are you using? Are you using celery with flask/django/another framework?

SHxKM commented 4 years ago

Are you using celery with flask/django/another framework?

Yes, this is happening in a Django app.

What version of python + celery are you using?

Python 3.6.8 Celery 4.2.1

Are you using the package sentry-sdk (there is a known issue #105)?

I don't have sentry-sdk installed, but I do use Sentry's raven. Hmmmm, maybe the code-snippet below could be a contributing factor?

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from raven.contrib.celery import register_logger_signal, register_signal
from raven.contrib.django.raven_compat.models import client

app = Celery("my_app")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

# this code may be problematic?
register_logger_signal(client)
register_logger_signal(client)
register_signal(client)
register_signal(client, ignore_expected=True)

If I'm not mistaken I added those register_signals so caught exceptions in Celery workers aren't bubbled up to Sentry. Maybe it's also "hiding" them from celery_once? I'm really shooting in the dark here, mind you.

cameronmaske commented 4 years ago

Sentry's raven should be fine.

@SHxKM I'm having trouble re-creating this issue, in order to figure out the cause. Running celery_once's tests against your version of Python + Celery still pass.

Any chance you could put together a minimal example that reproduces this issue?i