joanvila / aioredlock

:lock: The asyncio implemetation of Redis distributed locks
MIT License
295 stars 50 forks source link

Can not release the lock on single instance after redis.setex on locked resource. #82

Closed bolshoydi closed 3 years ago

bolshoydi commented 3 years ago

python == 3.7.6 aioredis==1.3.1 aioredlock==0.5.2

After acquiring lock and making setex on that resource I am unable to release lock. Neither from unlock method nor from context manger.

example code:

import asyncio
import logging
import time

import aioredis
from aioredlock import Aioredlock, LockError

try:
    import ujson as json
except ImportError:
    import json

CACHE_KEY_TTL = 60

class AioredlockWithNormalLog(Aioredlock):
    _log = None

    @property
    def log(self):
        if self._log is None:
            formatter = logging.Formatter(
                fmt="%(asctime)s:[%(levelname)s] [%(name)s] %(funcName)s:%(lineno)-4s %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S")
            handler = logging.StreamHandler()
            handler.setFormatter(formatter)

            self._log = logging.getLogger(__name__)
            self._log.setLevel(logging.DEBUG)
            self._log.handlers = [handler]
        return self._log

class LazyCache:
    LAST_KEY_TS = 'last_change_ts:{key}'
    KEY_WITHOUT_BLOCK = 'no:block'

    def __init__(self, redis, redlock: AioredlockWithNormalLog, log=None):
        self.redis = redis
        self.redlock = redlock
        self.log = log or redlock.log

    async def set(self, type_key: str, data: dict = None):
        """set cache in redis with lock"""
        last_change_key = self.LAST_KEY_TS.format(key=type_key)
        cache_timestamp = time.time()
        data = data if data else {}
        try:

            self.log.debug('>>> ACQUIRING LOCKS')
            curr_lock = await self.redlock.lock(type_key, lock_timeout=1)
            self.log.debug(f'{type_key} LOCK is valid?: {curr_lock.valid}')
            last_change_lock = await self.redlock.lock(last_change_key, lock_timeout=1)
            self.log.debug(f'{last_change_key} LOCK is valid?: {last_change_lock.valid}')

            self.log.debug('>>> EXECUTE PIPELINE FROM ORIGINAL REDIS CONN')
            pipe = self.redis.pipeline()
            pipe.setex(type_key, CACHE_KEY_TTL, json.dumps(data))
            pipe.setex(last_change_key, CACHE_KEY_TTL, cache_timestamp)
            pipe.setex(self.KEY_WITHOUT_BLOCK, CACHE_KEY_TTL, 'nothing interesting')
            await pipe.execute()

            ret = await self.redlock.get_active_locks()
            self.log.debug(f'>>> ACTIVE LOCKS: {ret}')

            self.log.debug('>>> TRYING TO UNBLOCK')
            await self.redlock.unlock(curr_lock)
            await self.redlock.unlock(last_change_lock)
            self.log.debug(f'>>> IS VALID LOCK {curr_lock.valid}')

        except LockError as e:
            self.log.debug(f'>>> LOCK ERROR <<< {e.args}')

    async def dummy_set_without_locked_resource_redis(self, type_key: str):
        """same as self.set but without set, just to show that in this case lock will be released
        setex resource that doesn't blocked"""
        last_change_key = self.LAST_KEY_TS.format(key=type_key)
        try:

            self.log.debug('>>> ACQUIRING LOCKS')
            curr_lock = await self.redlock.lock(type_key, lock_timeout=1)
            self.log.debug(f'{type_key} LOCK is valid?: {curr_lock.valid}')
            last_change_lock = await self.redlock.lock(last_change_key, lock_timeout=1)
            self.log.debug(f'{last_change_key} LOCK is valid?: {last_change_lock.valid}')

            self.log.debug('>>> SETEX NOT BLOCKED RESOURCE')
            await self.redis.setex(self.KEY_WITHOUT_BLOCK, CACHE_KEY_TTL, 'nothing interesting')

            ret = await self.redlock.get_active_locks()
            self.log.debug(f'>>> ACTIVE LOCKS: {ret}')

            self.log.debug('>>> TRYING TO UNBLOCK')
            await self.redlock.unlock(curr_lock)
            await self.redlock.unlock(last_change_lock)
            self.log.debug(f'>>> IS VALID LOCK {curr_lock.valid}')
        except LockError as e:
            self.log.debug(f'>>> LOCK ERROR <<<: {e.args}')

    async def set_with_context_manager(self, type_key: str, data: dict = None):
        """set cache in redis with lock using context manager"""
        last_change_key = self.LAST_KEY_TS.format(key=type_key)
        data = data if data else {}
        try:

            self.log.debug('>>> ACQUIRING LOCKS')
            async with await self.redlock.lock(type_key, lock_timeout=1) as curr_lock:
                self.log.debug(f'{type_key} LOCK is valid?: {curr_lock.valid}')
                last_change_lock = await self.redlock.lock(last_change_key, lock_timeout=1)
                self.log.debug(f'{last_change_key} LOCK is valid?: {last_change_lock.valid}')

                self.log.debug('>>> EXECUTE SETEX LOCKED RESOURCE INSIDE CONTEXT MANAGER FROM ORIGINAL REDIS CONN')
                pipe = self.redis.pipeline()
                pipe.setex(type_key, CACHE_KEY_TTL, json.dumps(data))
                await pipe.execute()

                ret = await self.redlock.get_active_locks()
                self.log.debug(f'>>> ACTIVE LOCKS: {ret}')

                await self.redlock.unlock(last_change_lock)
            self.log.debug(f'>>> IS VALID LOCK {curr_lock.valid}')

        except LockError as e:
            self.log.debug(f'>>> LOCK ERROR <<< {e.args}')

    async def get(self, type_key: str):
        res = await self.redis.get(type_key)
        if res:
            return json.loads(res)
        return res

    async def shutdown(self):
        self.redis.close()
        await self.redis.wait_closed()
        await self.redlock.destroy()

async def amain(key_type, data):
    DB = 1
    HOST = 'localhost'
    PORT = 6379
    ENCODE = 'utf8'

    redis = await aioredis.create_redis_pool(address=(HOST, PORT),
                                             db=DB,
                                             encoding=ENCODE,
                                             minsize=5,
                                             maxsize=10,
                                             timeout=60,
                                             )
    redis_instances = [{'host': HOST, 'port': PORT, 'db': DB}]
    # redis_instances = [redis.address] - creates connect to 0 DB, but I am working with 1-st DB
    redlock = AioredlockWithNormalLog(redis_instances)
    lazy_cache = LazyCache(redis=redis, redlock=redlock)

    await redis.flushall()
    print('SETTING WITH LOCKS')
    await lazy_cache.set(key_type, data=data)

    await redis.flushall()
    print('SETTING WITHOUT LOCKS')
    await lazy_cache.dummy_set_without_locked_resource_redis(key_type)

    await redis.flushall()
    print('SETTING WITH CONTEXT MANAGER')
    await lazy_cache.set_with_context_manager(key_type, data=data)

    print('GET DATA FROM REDIS')
    res = await lazy_cache.get(key_type)
    await redis.flushall()
    return res, lazy_cache

if __name__ == "__main__":
    key_type = 'some.key'
    data = {'some': 'data'}
    loop = asyncio.get_event_loop()
    try:
        s, lazy = loop.run_until_complete(amain(key_type=key_type, data=data))
        print(s)
    finally:
        loop.run_until_complete(lazy.shutdown())
gtmanfred commented 3 years ago

Thanks for the report, I will look into this.

I have updated your script so that it does not continue to add more log handlers each time you call self.log

Daniel

gtmanfred commented 3 years ago

Ok, this is the expected behavior. Because for the lock and unlock script we set an identifier, if the identifier does not match, then we do not unlock the key, and instead error.

https://github.com/joanvila/aioredlock/blob/523b02acf9b361360ca98a7ffb63dde44bd14487/aioredlock/redis.py#L32

So the thing you are trying to do here, cannot be done. You cannot change the values inside of the lock.

bolshoydi commented 3 years ago

Thanks for response. I thought, that we acquiring lock for redis key, not for connection checking. so name of lock should be different from keys you are stroring in redis. As for me, it is not so obvious and that fact doesn't pointed out in any docs.

self.redis.set('another_key','value') self.redis.get('another_key_123') self.redis.delete('another_key_456')

await self.redclock.unlock(curr_lock)

gtmanfred commented 3 years ago

ahh no, you don't need redlock for that. you can just do setnx.

https://redis.io/commands/setnx

This will return a 1 or 0 whether it was set or not, if it was set, you get a 1 back, and you know you have a "lock" on that key. You can also set an expire on that key after the fact.

https://redis.io/commands/expire

So that they key is not held onto forever.

The purpose for redlock is for stuff where you want to manage something like a processing lock for a multi threaded application. So if you write an application and you have a bunch of project ids. You can create a lock on that project id, and all of your other processes check if the project for a message is already locked, and if it is, then your application moves on to the next message, until the project is unlocked by either being unlocked, or the lock in redis expiring.

Here is another explaination of distributed locks https://redis.io/topics/distlock