vutran1710 / PyrateLimiter

⚔️Python Rate-Limiter using Leaky-Bucket Algorithm Family
https://pyratelimiter.readthedocs.io
MIT License
334 stars 36 forks source link

'CROSSSLOT Keys in request don't hash to the same slot' when using Redis Cluster #126

Closed Tomerva closed 6 months ago

Tomerva commented 1 year ago

I have a fairly simple code that keeps getting this exception

redis_db = Redis(host='MYHOST', port=6379, ssl=True, ssl_cert_reqs="none")
bucket = RedisBucket.init([Rate(1, Duration.SECOND.value)], redis_db, 'ratelimit')
limiter = Limiter(bucket, max_delay=Duration.HOUR)
deco = limiter.as_decorator()

@deco(lambda _: ('identity',1))
def sync_func(i):
    print(i)

for i in range(5):
    sync_func(i)

the exception:

File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/pyrate_limiter/limiter.py", line 315, in wrapper
    accquire_ok = self.try_acquire(name, weight)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/pyrate_limiter/limiter.py", line 286, in try_acquire
    result = self.handle_bucket_put(bucket, item)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/pyrate_limiter/limiter.py", line 235, in handle_bucket_put
    acquire = bucket.put(item)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/pyrate_limiter/buckets/redis_bucket.py", line 135, in put
    failing_rate = self._check_and_insert(item)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/pyrate_limiter/buckets/redis_bucket.py", line 117, in _check_and_insert
    idx = self.redis.evalsha(self.script_hash, len(keys), *keys, *args)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/commands/core.py", line 5307, in evalsha
    return self._evalsha("EVALSHA", sha, numkeys, *keys_and_args)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/commands/core.py", line 5291, in _evalsha
    return self.execute_command(command, sha, numkeys, *keys_and_args)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/client.py", line 508, in execute_command
    return conn.retry.call_with_retry(
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/client.py", line 509, in <lambda>
    lambda: self._send_command_parse_response(
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/client.py", line 485, in _send_command_parse_response
    return self.parse_response(conn, command_name, **options)
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/client.py", line 525, in parse_response
    response = connection.read_response()
  File "/Users/tomer/dev/projects/common/venv/lib/python3.8/site-packages/redis/connection.py", line 516, in read_response
    raise response
redis.exceptions.ResponseError: CROSSSLOT Keys in request don't hash to the same slot
vutran1710 commented 1 year ago

looks like you are using redis in cluster mode.

maybe you can try resolve the problem following this tutorial here?

https://hackernoon.com/resolving-the-crossslot-keys-error-with-redis-cluster-mode-enabled

Tomerva commented 1 year ago

still cant get it to work

redis_db = Redis(host='MYHOST', port=6379, ssl=True, ssl_cert_reqs="none")
bucket = RedisBucket.init([Rate(1, Duration.SECOND.value)], redis_db, '{ratelimit}:abc')
limiter = Limiter(bucket, max_delay=Duration.HOUR)
deco = limiter.as_decorator()

@deco(lambda _: ('{ratelimit}:abc',1))
def sync_func(i):
    print(i)
Tomerva commented 1 year ago

@vutran1710 i created new redis with cluster mode disabled but having an issue with rate limiting async requests when using "regular" redis it just sends all the request at once when using async redis it fails due to bucket full even though i set long max_delay

from redis.asyncio import Redis as AsyncRedis

async def create_limiter(*rates: Rate, identity):
    redis_db = AsyncRedis(host=endpoint, port=6379, ssl=True, ssl_cert_reqs="none")
    bucket_init_method = lambda: RedisBucket.init(rates, redis_db, identity)
    bucket = await RedisBucket.init(rates, redis_db, identity)
    limiter = Limiter(bucket, max_delay=Duration.HOUR)
    deco = limiter.as_decorator()
    return deco(lambda _: (identity,1))

async def main():
    mylimiter = await create_limiter(Rate(1, 1), identity='myident', bucket_class=BucketClass.ASYNC_REDIS)

    @mylimiter
    async def async_func(i):
        print(i)

    tasks = [asyncio.create_task(async_func(i)) for i in range(5)]
    await asyncio.gather(*tasks)

    asyncio.run(main())

getting this:

pyrate_limiter.exceptions.BucketFullException: Bucket for item=myident with Rate limit=1/1.0s is already full
vutran1710 commented 1 year ago

interesting. ill take a look at this soon

Tomerva commented 11 months ago

@vutran1710 any news on this issue?

vaharoni commented 6 months ago

I opened a PR that fixed the issue for me: https://github.com/vutran1710/PyrateLimiter/pull/151