vutran1710 / PyrateLimiter

⚔️Python Rate-Limiter using Leaky-Bucket Algorithm Family
https://pyratelimiter.readthedocs.io
MIT License
334 stars 36 forks source link

Guidance: async with delay with new v3.x #128

Closed LucasHantz closed 6 months ago

LucasHantz commented 1 year ago

Hello,

I've integrated a limiter in my application based on the following example available on this page: https://pypi.org/project/pyrate-limiter/2.10.0/ But I don't really see how that would translate with the new version. Seems the equivalent examples are no more and the docs and I'm lost on what would need to change. Can you please help?

import asyncio
from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, RequestRate

limiter = Limiter(RequestRate(5, Duration.SECOND))
n_requests = 27

@limiter.ratelimit("test", delay=True)
async def limited_function(start_time):
    print(f"t + {(time() - start_time):.5f}")

async def test_ratelimit():
    start_time = time()
    tasks = [limited_function(start_time) for _ in range(n_requests)]
    await asyncio.gather(*tasks)
    print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")

asyncio.run(test_ratelimit())

Thank you!

vutran1710 commented 1 year ago

at minimum,you can write

from pyrate_limiter import Duration, Rate, InMemoryBucket, Limiter, BucketFullException

rates = [Rate(5, Duration.SECOND * 2)] limiter = Limiter(rates, max_delay=5000) # delay 5 secs

LucasHantz commented 1 year ago
import asyncio
from time import perf_counter as time
from pyrate_limiter import Duration, Limiter, Rate

limiter = Limiter(Rate(1, Duration.SECOND), max_delay=5000)
n_requests = 10
decorator = limiter.as_decorator()

def mapping(*args, **kwargs):
    return "demo", 1

@decorator(mapping)
async def limited_function(start_time):
    print(f"t + {(time() - start_time):.5f}")

async def test_ratelimit():
    start_time = time()
    tasks = [limited_function(start_time) for _ in range(n_requests)]
    await asyncio.gather(*tasks)
    print(f"Ran {n_requests} requests in {time() - start_time:.5f} seconds")

asyncio.run(test_ratelimit())

OUTPUT

t + 9.45569
t + 9.45576
t + 9.45576
t + 9.45577
t + 9.45578
t + 9.45579
t + 9.45580
t + 9.45581
t + 9.45581
t + 9.45582
Ran 10 requests in 9.45593 seconds

The full block is awaited for 10sec and fired all at once.

LucasHantz commented 1 year ago

If I remove the option "max_delay" from the limiter:

limiter = Limiter(Rate(1, Duration.SECOND))

OUTPUT

RuntimeWarning: coroutine 'limited_function' was never awaited
  tasks = [limited_function(start_time) for _ in range(n_requests)]
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
  File "/Users/lucashantz/sandbox/test2.py", line 26, in <module>
    asyncio.run(test_ratelimit())
  File "/opt/homebrew/Cellar/python@3.11/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/lucashantz/sandbox/test2.py", line 21, in test_ratelimit
    tasks = [limited_function(start_time) for _ in range(n_requests)]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lucashantz/sandbox/test2.py", line 21, in <listcomp>
    tasks = [limited_function(start_time) for _ in range(n_requests)]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lucashantz/sandbox/venv/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 315, in wrapper
    accquire_ok = self.try_acquire(name, weight)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lucashantz/sandbox/venv/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 286, in try_acquire
    result = self.handle_bucket_put(bucket, item)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lucashantz/sandbox/venv/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 251, in handle_bucket_put
    return _handle_result(acquire)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lucashantz/sandbox/venv/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 231, in _handle_result
    return self.delay_or_raise(bucket, item)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/lucashantz/sandbox/venv/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 147, in delay_or_raise
    self._raise_bucket_full_if_necessary(bucket, item)
  File "/Users/lucashantz/sandbox/venv/lib/python3.11/site-packages/pyrate_limiter/limiter.py", line 120, in _raise_bucket_full_if_necessary
    raise BucketFullException(item, bucket.failing_rate)
pyrate_limiter.exceptions.BucketFullException: Bucket for item=demo with Rate limit=1/1.0s is already full
vutran1710 commented 1 year ago

Apparently this kind of use is quite strange compared to what I did with unit-test (ref #126 )

I will investigate soon.

stints commented 1 year ago

I'm also trying to understand the new format.

class AsyncRateLimitedClient:
    @staticmethod
    async def create(redis_pool, rate_limit, key) -> "AsyncRateLimitedClient":
        bucket = await RedisBucket.init(
            rates=[Rate(limit=rate_limit, interval=Duration.SECOND)],
            redis=async_redis.Redis(connection_pool=redis_pool),
            bucket_key=key,
        )
        limiter = Limiter(
            argument=bucket,
            raise_when_fail=False,
            max_delay=1000,
        )
        return AsyncRateLimitedClient(limiter=limiter, key=key)

    async def perform_lookup(self, url: str) -> Result:
        if await self.limiter.try_acquire(self.key):
            # call aiohttp and return result

I'm testing this entire thing with a rate limit of 1 per second and it's consistently failing.

I'm seeing a lot of:

ERROR - 2023-09-08 22:36:35,312 - pyrate_limiter - limiter -
Re-acquiring with delay expected to be successful,
if it failed then either clock or bucket is probably unstable

I'm making the call to perform_lookup similarly to @LucasHantz via a asyncio.gather which is calling around 15 at once for my test. Even updating the rate limit count from 1 to 5 still results in some misses instead of proper waits.

LucasHantz commented 11 months ago

Hello, any update on this?

vutran1710 commented 11 months ago

Hello, any update on this?

Sorrry not yet, since I have been kind of busy lately. Afraid this has to wait

vutran1710 commented 6 months ago

Since its been a while. Im closing this. If anyone has problem please open a new issue. Dont forget to use/upgrade to the latest version of the lib first