vutran1710 / PyrateLimiter

⚔️Python Rate-Limiter using Leaky-Bucket Algorithm Family
https://pyratelimiter.readthedocs.io
MIT License
334 stars 36 forks source link

Issue with try_acquire and multiple names with v3+ #153

Closed manu-paylead closed 6 months ago

manu-paylead commented 6 months ago

Hello there! I'm working on migrating our codebase from the V2.10 to 3.21. I've got most of the thing working but I'm having issues with one of our test cases that use to work and it seems it does not anymore.

Here is the code that's needed to repro (You need a redis server):

from pyrate_limiter import AbstractClock, Duration, Limiter, Rate, RedisBucket
from redis.client import Redis

REDIS_URI = "redis://localhost:6379/0"
BUCKET_KEY = "SomeBucketKey"

class RedisClock(AbstractClock):
    def now(self) -> int:
        unix_time, microseconds = Redis.from_url(REDIS_URI).time()
        return unix_time + microseconds / 1_000_000

if __name__ == "__main__":
    Redis.from_url(REDIS_URI).delete(BUCKET_KEY)
    limiter = Limiter(
        RedisBucket.init(
            rates=(Rate(5, 5 * Duration.SECOND),),
            redis=Redis.from_url(REDIS_URI),
            bucket_key="SomeBucketKey",
        ),
        clock=RedisClock(),
    )
    for _ in range(3):
        limiter.try_acquire("test-1")
    for _ in range(3):
        limiter.try_acquire("another_key")
    Redis.from_url(REDIS_URI).delete(BUCKET_KEY)

With 2.10 I am having no issue with this code (If we except the couple of things that changed in the constructor of Limiter and the RequestRate -> Rate change) With 3.21.1 I am having BucketFullException: Bucket for item=another_key with Rate limit=5/5.0s is already full at the line limiter.try_acquire("another_key")

As far as I know "test-1" and "another_key" should not conflict on the rate limit of 5. We should be allowed to have 5 of each in the bucket.

Am i misunderstanding something here ?

Regards

Manu

vutran1710 commented 6 months ago

In v3, the lib requires explicit bucket-routing code instead of using item name as value. If you want separated buckets for each item name, you have to explicitly implement the BucketFactory.get(item_name) method, eg:

from pyrate_limiter import BucketFactory
from pyrate_limiter import AbstractBucket

class RedisBucketFactory(BucketFactory):
    def __init__(self):
        """setup your buckets here"""

    def get(self, item: RateItem) -> AbstractBucket:
        if item.name == 'thing1':
             return self.alt_bucket
        return self.default_bucket
vutran1710 commented 6 months ago

I advise you look at the updated document to understand more about the changes in v3 design. Hope this helps. https://github.com/vutran1710/PyrateLimiter?tab=readme-ov-file#defining-clock--routing-logic-with-bucketfactory

manu-paylead commented 6 months ago

So I came with the example modified like this:

from multiprocessing.pool import ThreadPool
from time import sleep

import structlog
from pyrate_limiter import AbstractBucket, BucketFactory, Duration, Limiter, Rate, RateItem, RedisBucket
from redis.client import Redis

REDIS_URI = "redis://localhost:6379/0"

logger = structlog.get_logger()

def redis_now() -> int:
    unix_time, microseconds = Redis.from_url(REDIS_URI).time()
    return unix_time + microseconds / 1_000_000

class SimpleRedisBucketFactory(BucketFactory):
    def __init__(self, rates, redis, *args, **kwargs):
        self.rates = rates
        self.redis = redis
        self.thread_pool = ThreadPool(processes=1)
        self.buckets = {}
        logger.warning("__init__")

    def wrap_item(self, name: str, weight: int = 1) -> RateItem:
        now = redis_now()
        logger.warning("wrap_item", name=name, ts=now)
        return RateItem(name, redis_now(), weight=weight)

    def get(self, _item: RateItem) -> AbstractBucket:
        """For simplicity's sake, all items route to the same, single bucket"""
        name = _item.name
        logger.warning("get", name=name, buckets=self.buckets)
        if not self.buckets.get(name, None):
            self.buckets[name] = RedisBucket.init(
                rates=self.rates,
                redis=self.redis,
                bucket_key=name,
            )
        return self.buckets[name]

if __name__ == "__main__":
    Redis.from_url(REDIS_URI).delete("test-1")
    Redis.from_url(REDIS_URI).delete("another_key")

    limiter = Limiter(
        SimpleRedisBucketFactory(
            rates=(Rate(5, 5 * Duration.SECOND),),
            redis=Redis.from_url(REDIS_URI),
        ),
    )
    for _ in range(3):
        limiter.try_acquire("test-1")
    for _ in range(3):
        limiter.try_acquire("another_key")

    sleep(15)
    for _ in range(3):
        limiter.try_acquire("test-1")
    for _ in range(3):
        limiter.try_acquire("another_key")

But it stills errors out after the 15sec wait with pyrate_limiter.exceptions.BucketFullException: Bucket for item=test-1 with Rate limit=5/5.0s is already full

Am I missing something else here, isn't it supposed to work ?(3 items then 15sec sleep then 3 items should work for a 5items/5seconds rate)

vutran1710 commented 6 months ago

You are very close. The problem is that your BucketFactory is not leaking, so the items in the buckets is not removed periodically. You can either append self.schedule_leak(new_bucket, clock) right after the initialization of new RedisBucket in your get method, or you can call self.create(RedisBucket, **your-bucket-class-arguments) instead.

Example:

class SimpleRedisBucketFactory(BucketFactory):
    def __init__(self, rates, redis, *args, **kwargs):
        self.rates = rates
        self.redis = redis
        self.thread_pool = ThreadPool(processes=1)
        self.buckets = {}
        logger.warning("__init__")

    def wrap_item(self, name: str, weight: int = 1) -> RateItem:
        now = redis_now()
        logger.warning("wrap_item", name=name, ts=now)
        return RateItem(name, redis_now(), weight=weight)

    def get(self, _item: RateItem) -> AbstractBucket:
        """For simplicity's sake, all items route to the same, single bucket"""
        name = _item.name
        logger.warning("get", name=name, buckets=self.buckets)
        if not self.buckets.get(name, None):
            self.buckets[name] = self.create(RedisBucket, rates=self.rates, redis=self.redis, bucket_key=name)
            # the line above indentical to...
            # self.buckets[name] = RedisBucket(...)
            # self.schedule_lake(self.buckets[name], your-clock)
        return self.buckets[name]

Note that you still need to pass an apropriate clock instance instead of using now() as function. That clock is required to make leaking / removing items work properly

vutran1710 commented 6 months ago

The document on this part is quite horrible I must admit. I just updated it here https://github.com/vutran1710/PyrateLimiter?tab=readme-ov-file#creating-buckets-dynamically

vutran1710 commented 6 months ago

I think we can close this