steinitzu / celery-singleton

Seamlessly prevent duplicate executions of celery tasks
MIT License
240 stars 36 forks source link

BUG, when task was revoked/terminated , singleton also can not create new task #29

Open chestarss opened 4 years ago

chestarss commented 4 years ago

BUG, task was revoked , singleto also can not create new task

chestarss commented 4 years ago

In [14]: cron_sync_approval.delay()
Out[14]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

In [15]: from celery.result import AsyncResult

In [16]: res = AsyncResult("742569f5-3148-4bdf-9203-75ac8a0f87c5")

In [17]: res.state
Out[17]: 'REVOKED'

In [18]: cron_sync_approval.delay()
Out[18]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

In [19]: cron_sync_approval.delay()
Out[19]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

In [20]: cron_sync_approval.delay()
Out[20]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

chestarss commented 4 years ago

workaroud method!

from celery.signals import task_revoked
from celery import current_app

@task_revoked.connect
def clean_revoked_task_singleton_lock(sender=None, headers=None, body=None, **kwargs):
    from celery_singleton.util import generate_lock
    from celery_singleton.config import Config
    from celery_singleton.backends import get_backend
    task_id = kwargs['request'].id
    task_name = kwargs['request'].task
    task_args = kwargs['request'].args
    task_kwargs = kwargs['request'].kwargs

    app_config = Config(current_app)
    backend = get_backend(app_config)
    redis_key = generate_lock(task_name, task_args=task_args, task_kwargs=task_kwargs, key_prefix=app_config.key_prefix)

    # clean lock
    cache_task_id = backend.get(redis_key)
    if cache_task_id and cache_task_id.startswith(task_id):
        print(f'clean singletion lock:{redis_key} task_id:{task_id}')
        backend.clear(redis_key)
birdhackor commented 3 years ago

workaroud method!

from celery.signals import task_revoked
from celery import current_app

@task_revoked.connect
def clean_revoked_task_singleton_lock(sender=None, headers=None, body=None, **kwargs):
    from celery_singleton.util import generate_lock
    from celery_singleton.config import Config
    from celery_singleton.backends import get_backend
    task_id = kwargs['request'].id
    task_name = kwargs['request'].task
    task_args = kwargs['request'].args
    task_kwargs = kwargs['request'].kwargs

    app_config = Config(current_app)
    backend = get_backend(app_config)
    redis_key = generate_lock(task_name, task_args=task_args, task_kwargs=task_kwargs, key_prefix=app_config.key_prefix)

    # clean lock
    cache_task_id = backend.get(redis_key)
    if cache_task_id and cache_task_id.startswith(task_id):
        print(f'clean singletion lock:{redis_key} task_id:{task_id}')
        backend.clear(redis_key)

for task that use unique_on, should modify to this:

import inspect
import importlib

from celery import current_app
from celery.signals import task_revoked
from celery_singleton.util import generate_lock
from celery_singleton.config import Config
from celery_singleton.backends import get_backend

@task_revoked.connect
def clean_revoked_task_singleton_lock(sender=None, headers=None, body=None, **kwargs):
    task_id = kwargs['request'].id
    task_name = kwargs['request'].task
    task_args = kwargs['request'].args
    task_kwargs = kwargs['request'].kwargs

    mod_name, func_name = task_name.rsplit('.',1)
    mod = importlib.import_module(mod_name)
    func = getattr(mod, func_name)
    unique_on = getattr(func, 'unique_on', None)

    if unique_on:
        if isinstance(unique_on, str):
            unique_on = [unique_on]
        sig = inspect.signature(func)
        bound = sig.bind(*task_args, **task_kwargs).arguments

        unique_args = []
        unique_kwargs = {key: bound[key] for key in unique_on}
    else:
        unique_args = task_args
        unique_kwargs = task_kwargs

    app_config = Config(current_app)
    backend = get_backend(app_config)
    redis_key = generate_lock(task_name,
                              task_args=unique_args,
                              task_kwargs=unique_kwargs,
                              key_prefix=app_config.key_prefix)

    # clean lock
    cache_task_id = backend.get(redis_key)
    if cache_task_id and cache_task_id.startswith(task_id):
        print(f'clean singletion lock:{redis_key} task_id:{task_id}')
        backend.clear(redis_key)