ionelmc / python-redis-lock

Lock context manager implemented via redis SET NX EX and BLPOP.
https://pypi.python.org/pypi/python-redis-lock
BSD 2-Clause "Simplified" License
550 stars 78 forks source link

auto renew does not seem to work when decorating a celery task #91

Closed timrichardson closed 2 years ago

timrichardson commented 2 years ago

I have made a decorator for celery tasks, called like this:

@app.task(bind=True)
@no_simultaneous_execution_no_retry
def task_djcity_pack_orders(self,job_id=None, order_number=None):

and in the decorator I am using a context manager:

 with acquire_redis_lock_with_timeout(lock_key=lock_key, timeout=30, expire=None,
                                                     auto_renewal=False) as lock:

which gets a lock:

  lock = redis_lock.Lock(
            redis_client=get_redis_connection(alias="default"),
            name=lock_key,
            expire=expire,
            auto_renewal=auto_renewal,
        )

Failure to get the lock raises an exception. It all works, except that the locks disappear after a while, even while the celery task is still running. I have added logging and the context manager definitely does not exit until the celery task is completed. But the auto-renew job seems to not notice that the task is still active.

I have had to make expire = None because auto_renewal was not working. The locks are just expiring and the decorator then can't work. I don't know why. During the timeout period, if I was using one, the lock exists in redis and it works, but then the lock goes away. The lock persists when expire=None.

ionelmc commented 2 years ago

Can you show the complete code? What sort of concurrency (the pool implementation) do you have in celery? A reproducer would be even better.

timrichardson commented 2 years ago

celery is using a gevent pool. auto renew is working well when I use it inside a task (not a decorator).

this is the decorator used to allow only one task (by name) to run.

def no_simultaneous_execution_no_retry(f):
    """
    Decorator that prevents a task form being executed with the
    same *args and **kwargs more than one at a time. If the task is already running, discard it.
    Only the name is used to mark the job as unique
     (on assumption that the job is scheduled)

     @app.task(bind=True)   # use bind=True and add 'self' as first argument to the function
    @no_simultaneous_execution_no_retry
    def refresh_and_cache_promise_dates(self,
    """

    @functools.wraps(f)
    def wrapper(self, *args, **kwargs):
        # Create lock_id used as cache key
        # lock_id = '{}-{}-{}'.format(self.name, args, kwargs)
        #

        lock_key = f"CELERY_UNIQUE_TASK_LOCK-{self.name}"
        from celery import current_task
        if not current_task or current_task.request.id is None:
            # nothing to do with celery so carry on
            f(self, *args, **kwargs)
        else:
            job_id = kwargs.get("job_id")
            job, job_id = JobMaster.get_job(job_id=job_id, job_title=f"Task decorator for: {self.name}") #get a job to log messages
            msg = f" in task decorator {self.name} attempting to acquire lock {lock_key})"
            JobLog.save_log_message(job=job, job_score=100, status=core_consts.WAIT,
                                    log_message=msg)
            try:
                with acquire_redis_lock_with_timeout(lock_key=lock_key, timeout=30, expire=None,
                                                     auto_renewal=False) as lock:
                    # print(f"acquired lock {lock_key} with {lock.id=)
                    msg = f" in task decorator {self.name} acquired lock {lock_key} with {lock.id=})"
                    JobLog.save_log_message(job=job, job_score=0, status=core_consts.WAIT,
                                            log_message=msg)

                    f(self, *args, **kwargs)
                    msg = f" in task decorator {self.name}, task has completed, releasing lock {lock_key} with {lock.id=})"
                    JobLog.save_log_message(job=job, job_score=100, status=core_consts.COMPLETED,
                                            log_message=msg)
            except redis_lock.NotAcquired:
                logger.info(
                    f"task {self.name} is already running according to {lock_key}, so we are discarding this attempt")
                JobLog.save_log_message(job=job, job_score=100, status=core_consts.ERRORS_FOUND,
                                        log_message=f"ERROR this job is already running")

                # self.apply_async(args=args, kwargs=kwargs, countdown=3) #this would put the locked job back to queue
                raise

    return wrapper

this is the context manager:

# https://stackoverflow.com/q/16740104/401226
@contextmanager
def acquire_redis_lock_with_timeout(lock_key: Optional[str], timeout: int, expire=60, auto_renewal=True):
    """
    if timeout is 0, non blocking mode. Expire is managed by a background thread: a lock which appears dead should be removed after this time
    usage:
        import redis_lock
        from core.connector.api_helpers import acquire_redis_lock_with_timeout
        try:
            with acquire_redis_lock_with_timeout(lock_key=lock_key,timeout=60,expire=30,auto_renewal=True) as lock:
                print(f"acquired lock {lock_key} with {lock.id=)
            ...
        except redis_lock.NotAcquired:
        ...

        if lock_key is None, then don't acquire a lock
    """
    if lock_key is None:
        yield True
    else:
        if timeout == 0 or not timeout:
            blocking_mode = False
            timeout = None
        else:
            blocking_mode = True
        lock = redis_lock.Lock(
            redis_client=get_redis_connection(alias="default"),
            name=lock_key,
            expire=expire,
            auto_renewal=auto_renewal,
        )

        result = lock.acquire(blocking=blocking_mode, timeout=timeout)
        if not result:
            raise redis_lock.NotAcquired()
        yield lock
        if result:
            lock.release()

I think reproducing is as simple as a task which runs longer than the expiry time, but I will find time to prove that.

ionelmc commented 2 years ago

Mkay it's probably some bug with auto renew thread. I'll have to add some integration tests with gevent (gevent most likely messes up that thread). Meanwhile you could switch to the other pools as a workaround...

timrichardson commented 2 years ago

I will try wrapping the yield in try/finally as well, to deal with the lock not being released with no expiry. And I will think about another pool method. I am worried about ram consumption, but I will see how it goes. I have been using gevent and this locking approach for a long time and not noticed problems, it's the use in the decorator which is new.

timrichardson commented 2 years ago

Update: I moved to a threads pool, and restored auto-renewal and expiry for the locking. Under production load it is not showing any problems with prematurely releasing locks. Thanks for the tip: with this locking and a threads pool, I am now better off.

Regarding the auto-renewal 'job', can it be explicitly sent to a specific celery queue to run in another process?

ionelmc commented 2 years ago

Yes you can certainly run renewal in a celery task. The daemon thread simply calls extend in a loop, you can do it too - you just need to make sure you have the exact same id in the celery task. If you use random ids you need to share the same id.

Although it's not clear to me why you'd want to do it this way. The autorenewal is supposed to solve the situation where some code using some resource might crash or get killed (imagine oom killer), and the autorenewal keeps the lock locked while also gets it unlocked (via expiry) in case there's some crashing/killing going on. If you do that in a celery task you loose all that.

Anyway, I've added integration tests for eventlet and gevent just to make sure that ain't the issue.

ionelmc commented 2 years ago

Regarding your actual problem, try to look at logs to see what's going on (redis-lock logs plenty) or provide a reproducer.

timrichardson commented 2 years ago

I swapped to using a thread pool and the problem has not re-occurred and overall performance is better anyway for my workload, so that was a good idea, thanks. So circumstantially there is a problem with gevent, although I have been using gevent for a few years. A guess is that the autorenew code is blocked for too long in the gevent pool for some reason. My code with the problem uses postgresql (psycopg2-binary) and I now think this is not good with gevent.

I will not be pursuing this further, I am putting this down to an async problem. It might be unwise to rely on auto-renewal with gevent. Thanks for following up.