Open thehesiod opened 7 years ago
If the lock was held from __enter__
through to __exit__
, then the task being rate limited would have to execute sequentially, which is not the goal.
The rate limited task should be able to execute up to max_calls
within period
, but any more than that will block (while holding the lock). Once the lock is released, the other simultaneous tasks can attempt to enter until it blocks again.
Do you have an example that doesn't do what you expected?
think of this scenario:
from ratelimiter import RateLimiter
import asyncio
async def worker(limiter):
async with limiter:
print("Done")
await asyncio.sleep(1000)
async def main():
limiter = RateLimiter(1, 100)
await asyncio.gather(*[worker(limiter) for i in range(100)])
if __name__ == '__main__':
_loop = asyncio.get_event_loop()
_loop.run_until_complete(main())
they all run immediately.
this is actually an interesting problem to solve efficiently and correctly. I think you'll need a semaphore.
ok here's WIP impl that passes my unittests: https://gist.github.com/thehesiod/407670e6d7b6883d3d416d1f649616de
Thanks for the example and now an attempt to fix it. I'll have more time to look at it in July. I had a first attempt at fixing it using a semaphore. I also want to write tests to really nail down the expected behaviour, since they are lacking just now.
actually last version had a crucial bug, after a code review I updated it to the current impl on gist which is actually faster.
Can confirm this. Calling an API that accsept 100 calls per second. set ratelimit to 50.
import aiohttp
import asyncio
from ratelimiter import RateLimiter
class callUrls:
async_loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=async_loop)
rate = RateLimiter(max_calls=50)
@classmethod
async def getJSON(cls, url):
async with cls.rate:
async with cls.client.get(url) as response:
info = await response.json()
try:
assert response.status == 200
except AssertionError:
raise AssertionError(info)
return info
def open_client(cls):
cls.client = aiohttp.ClientSession(loop=cls.async_loop)
getting Error:
line 20: raise AssertionError(info)
AssertionError: {'code': 403, 'type': 'Forbidden', 'detail': 'Account Over Queries Per Second Limit'}
I have a proof-of-concept of a ratelimiter v2 that fixes these problems, with pluggable rate limiting algorithms (e.g. leaky bucket).
It'll be a v2 because the API has breaking changes. Also, it'll optionally allow limiting concurrent calls. Sometimes we only care that __enter__
is rate limited, but sometimes we want to wait until __exit__
until the next concurrent call can start.
I'll try push this to a v2 branch soon.
Hi, I assume this is abandoned? Cheers
I believe this implementation is broken if you enter/aenter the class twice before aexit'ing (share instance with multiple classes). The lock should be from enter/aenter through exit/aexit. This would also mean that you can't use the sync lock in exit with aexit