Open Midnighter opened 4 years ago
There's no built-in API for that, no.
You might want to take a https://github.com/python-trio/trimeter which has some really nice generic tools for capacity limiting.
Say the API I'm hitting allows 10 requests per second, I guess a naive approach would be to set the hard limit of the connection pool to 10 and asyncio.sleep 1/10 s after every request. Does that sound about right or am I missing something obvious?
Yeah, something like that could be perfectly reasonable (minus however much time has elapsed making the request)
They warn that the library is not production ready but this example here looks perfect for me.
async with trimeter.amap(fetch_and_process, urls, max_at_once=10, max_per_second=10) as results:
# Then iterate over the return values, as they become available
# (i.e., not necessarily in the original order)
async for result in results:
...
Failed to install trimeter in a conda environment but at least I'll know to watch the package. Thanks!
I just wanted to comment that I'm running into this exact scenario:
r.status_code=429, r.text='You have exceeded the limit of 15 requests per second; please reduce your concurrent connections'
It'd be really to have a max_per_second
or similar parameter in the client. 😄
Have a look at https://github.com/florimondmanca/aiometer it has served me well in most cases.
Oh sweet, that looks more mature than trimeter
. Thank you!
It would be really helpful to have this API within httpx, even if it used something like aiometer internally. Using aiometer externally completely changes the way you interact with httpx and makes it much harder to write a client library. Being able to set something on the client and, then forget about it and use your normal access pattern, would be a huge leap in ergonomics. @Midnighter could you reopen this issue?
I suppose this could be solved without changes to httpx required, by writing a custom transport that wraps the default one with calls to aiometer?
I suppose this could be solved without changes to httpx required, by writing a custom transport that wraps the default one with calls to aiometer?
Certainly possible!
Not very experienced with asyncio but here's my stab (having gone through just about every synchronisation primitive it has to offer...).
Rate limits are generally defined as N
requests per I
interval, so this subclass of AsyncClient wraps a semaphore with N
slots available. When send
is scheduled, it has to wait until the semaphore becomes available, and then eagerly schedules two tasks: one to release the semaphore after I
has expired, and one to send the request. This ensures that the response can be returned as soon as it's available (so we don't have to sleep after every request if we're not going to hit the limit), and that we don't hit the limit. I think that creating a task out of the super().send
coroutine means that there shouldn't be any waiting before it gets sent off (which would cause problems); or at last that creating the waiter task second should mean that it shows up later in the queue.
import asyncio
import datetime as dt
from functools import wraps
from typing import Union
from httpx import AsyncClient
# unless you keep a strong reference to a running task, it can be dropped during execution
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
_background_tasks = set()
class RateLimitedClient(AsyncClient):
"""httpx.AsyncClient with a rate limit."""
def __init__(self, interval: Union[dt.timedelta, float], count=1, **kwargs):
"""
Parameters
----------
interval : Union[dt.timedelta, float]
Length of interval.
If a float is given, seconds are assumed.
numerator : int, optional
Number of requests which can be sent in any given interval (default 1).
"""
if isinstance(interval, dt.timedelta):
interval = interval.total_seconds()
self.interval = interval
self.semaphore = asyncio.Semaphore(count)
super().__init__(**kwargs)
def _schedule_semaphore_release(self):
wait = asyncio.create_task(asyncio.sleep(self.interval))
_background_tasks.add(wait)
def wait_cb(task):
self.semaphore.release()
_background_tasks.discard(task)
wait.add_done_callback(wait_cb)
@wraps(AsyncClient.send)
async def send(self, *args, **kwargs):
await self.semaphore.acquire()
send = asyncio.create_task(super().send(*args, **kwargs))
self._schedule_semaphore_release()
return await send
Since this topic keeps being popular, I'll mention another possible solution that might integrate better with already existing httpx usage flows: using a rate limiter context manager, say from limiter or aiolimiter, within an httpx.Auth subclass.
Something like (Untested code):
import aiolimiter
import httpx
class HttpxRateLimiter(httpx.Auth):
def __init__(self):
self.limiter = aiolimter.AsyncLimiter(600) # <- 10 per second
async def async_auth_flow(self, request):
async with self.limiter:
yield request
In this comment @florimondmanca suggests that the connection pool limits can be used to rate-limit the number of requests made. I can see, of course, how setting those limits will constrain the number of 'concurrent' requests made by httpx. However, what if I want to limit the number of requests made per unit of time?
Say the API I'm hitting allows 10 requests per second, I guess a naive approach would be to set the hard limit of the connection pool to 10 and
asyncio.sleep
1/10 s after every request. Does that sound about right or am I missing something obvious?