cameronmaske / celery-once

Celery Once allows you to prevent multiple execution and queuing of celery tasks.
https://pypi.python.org/pypi/celery_once/
BSD 2-Clause "Simplified" License
661 stars 90 forks source link

Default lock keys use process memory address #93

Closed kuba-lilz closed 5 years ago

kuba-lilz commented 5 years ago

I tried using QueueOnce tasks and see that lock keys they generate look like this: qo_simple_test_task_args-(<@task: simple_test_task of __main__ at 0x10af06d38>,)_kwargs-{}

I see two problems with this key:

Problem with of __main__ If entry point for process that called the task wouldn't be __main__, this part would be different. Not sure how could I achieve such entry point, but I just don't see why use process entry point as part of task key - same task posted from processes that use different entry point would be mapped to different lock keys.

Problem with at 0x10af06d38 This is a memory address and will be different for every process. If I have two separate processes posting same task, then they will be mapped to two different keys and both will get queued - not the expected behaviour for QueueOnce task. I was able to verify this problem simply by running same tasks posting code from two different terminal windows.

Are both conscious choices? They really don't seem very natural defaults to me.

For completeness - here are library versions I used to produce above key:

In [1]: import redis, celery, celery_once

In [2]: redis.__version__
Out[2]: '3.2.0'

In [3]: celery.__version__
Out[3]: '4.2.1'

In [4]: celery_once.__version__
Out[4]: '2.1.0'
cameronmaske commented 5 years ago

Hi @kuba-lilz, that key looks very wrong, they should look more like this.

Could you include the code of simple_test_task and how you are calling it? Also, it would be good to know version of python you are using.

kuba-lilz commented 5 years ago

Thank you very much for the reply.

I created a simple example to reproduce the problem and... found it working just fine... After comparing with commits from yesterday to see what was different, I realized I had a decorator on the Task.run(...) method that broke the key.

Here's the code that shows the problem:

# Adding this decorator breaks QueueOnce
def catch_celery_exception(wrapped_function):

    @functools.wraps(wrapped_function)
    def run_celery_task(*args, **kwargs):

        try:

            return wrapped_function(*args, **kwargs)

        except Exception as error:

            # Logging code here
            # Skipped in this example for simplicity
            raise error

    return run_celery_task

class SimpleTask(celery_once.QueueOnce):

    name = "simple_test_task"

    def __init__(self):

        super().__init__()
        self.once["graceful"] = True

    # This annotation breaks the key and its deletion - without it everything works fine
    @catch_celery_exception
    def run(self, *args, **kwargs):

        print("Started simple test task, about to sleep!!!")
        print("My key is: {}".format(self.get_key(args=args, kwargs=kwargs)))

        time.sleep(10)

        print("Woke up, finishing simple test task")

# Calling task:
lilz.celery.tasks.SimpleTask().apply_async()

I'll look into the decorator to see if it can be made to behave correctly with QueueOnce. Might be the decorator is wrong for all types of tasks and the issue just manifested itself with QueueOnce.

I apologize for taking your time, the problem really was between the chair and the keyboard.

And double thank you for pointing out behaviour of redis.flushall(). I erroneously assumed it's similar to file.flush() on IO operations without checking the documentation.

kuba-lilz commented 5 years ago

For posterity: the problem was with my decorator not accounting correctly for the fact that Task.run(...) is a method, not a function.

This is explained on a different example here: https://stackoverflow.com/a/1367567/760239

For my specific case rewriting decorator to

def catch_celery_exception(wrapped_function):

    print("Is this even run?")

    @functools.wraps(wrapped_function)
    def run_celery_task(self, *args, **kwargs):

        try:

            return wrapped_function(self, *args, **kwargs)

        except Exception as error:

            # Logging code here
            # Skipped now for simplicity
            raise error

    return run_celery_task

solved the problem.

Once again - nothing related to celery_once, the issue just manifested itself while using it. I apologize for the confusion.

cameronmaske commented 5 years ago

@kuba-lilz No worries! Glad to hear you found the issue. All the best with your projects.