mjpieters / aiolimiter

An efficient implementation of a rate limiter for asyncio.
https://aiolimiter.readthedocs.io/en/latest/
MIT License
480 stars 21 forks source link

Potential perf issue when used togeter with httpx and a large number of waiting tasks #73

Open mjpieters opened 2 years ago

mjpieters commented 2 years ago

Someone found that when using httpx rate limited by aiolimiter with a large number of active tasks, httpx requests have a hard time getting going. The more tasks are started, the longer the initial startup.

See the full discussion in the httpx repository.

This may be something that aiolimiter is doing wrong, something I'd like to avoid if possible. To reproduce, in a Python 3.9 venv install httpx, aiohttp and sanic (versions pinned to ensure reproducibility):

bin/pip install 'sanic>=21.9.1,<21.10.0' 'httpx>=0.20.0,<0.21.0' 'aiohttp>=3.8.0,<3.90'

and add two scripts, a server and a client script; server.py is:

import asyncio

from random import uniform

from sanic import Sanic
from sanic.response import json

app = Sanic("example")

@app.get("/")
async def hello_world(request):
    time = uniform(0.25, 0.80)
    await asyncio.sleep(time)

    return json({"time": time})

app.run(host='0.0.0.0', port=8000, access_log=True)

and can be run in a separate terminal.

The client.py script has several options to change behaviour:

import asyncio
import logging
import math
from typing import AsyncContextManager, Union

from aiolimiter import AsyncLimiter
from aiohttp import ClientSession
from httpx import AsyncClient

logger = logging.getLogger("main")

class TaskNameFilter(logging.Filter):
    def filter(self, record):
        try:
            task = asyncio.current_task()
        except RuntimeError:
            task = None
        record.taskname = task.get_name() if task is not None else "~"
        return True

class anullcontext:
    async def __aenter__(self):
        return None

    async def __aexit__(self, *excinfo):
        return None

async def task(limiter: AsyncLimiter, sema: AsyncContextManager, client: AsyncClient):
    async with sema:
        logger.debug(">> sema")
        async with limiter:
            logger.debug(">> limiter")
            await client.get("http://localhost:8000")
            logger.info("request made")
        logger.debug("<< limiter")
    logger.debug("<< sema")

async def main(
    num_requests: int,
    use_sema: bool,
    rate: float = 20,
    period: float = 1,
    client_factory: type[Union[AsyncClient, ClientSession]] = AsyncClient,
):
    limiter = AsyncLimiter(rate, period)
    sema = asyncio.BoundedSemaphore(100) if use_sema else anullcontext()
    client = client_factory()
    width = 1 + int(math.log10(num_requests))
    tasks = [
        asyncio.create_task(task(limiter, sema, client), name=f"#r{x:0{width}d}")
        for x in range(num_requests)
    ]

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-s", dest="use_sema", action="store_true", help="Use a bounded semaphore"
    )
    parser.add_argument("-u", dest="use_uvloop", action="store_true", help="Use uvloop")
    parser.add_argument(
        "-a",
        dest="use_aiohttp",
        action="store_true",
        help="Use aiohttp instead of httpx",
    )
    parser.add_argument(
        "-d",
        dest="debug",
        action="store_true",
        help="Enable asyncio logging and debugging",
    )
    parser.add_argument(
        "-r",
        dest="rate",
        type=float,
        default=20,
        help="Maximum number of requests in a time period",
    )
    parser.add_argument(
        "-p", dest="period", type=float, default=1, help="Rate limit time period"
    )
    parser.add_argument("num_requests", nargs="?", default=50, type=int)
    args = parser.parse_args()

    factory = ClientSession if args.use_aiohttp else AsyncClient

    if args.use_uvloop:
        import uvloop

        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

    logging.basicConfig(
        level=logging.DEBUG if args.debug else logging.INFO,
        format="(%(relativeCreated)06d %(taskname)s) %(levelname)s:%(name)s:%(message)s",
    )
    filter = TaskNameFilter()
    for handler in logging.getLogger().handlers:
        handler.addFilter(filter)
    logging.info("Running test with %r", args)

    asyncio.run(
        main(
            args.num_requests,
            args.use_sema,
            args.rate,
            args.period,
            client_factory=factory,
        ),
        debug=args.debug,
    )

Given that aiohttp does not show problems this may well be a httpx-specific issue however.

The issue becomes clearer the higher the request count given on the command line, and is more apparent when debugging is switched on, e.g.

bin/python client.py -d 500

and goes away when using a semaphore (-s) or using aiohttp (-a).

bharel commented 2 years ago

@mjpieters I was actually creating a library myself for aio rate limiting when I encountered yours, and thought adding a Leaky Bucket implementation is a great idea.

Unfortunately your implementation has 2 major faults I believe:

  1. You check the time and _leak on every call. That causes an unnecessary overhead as on 100% of the cases there won't be a leak if you schedule 1k requests at once.
  2. You're bashing the event loop checking for a free spot. Basically, all of your waiters awake every rate tick, making 1k new futures every tick for a single vacant spot. It's an enormous overhead, and with enough waiters, occurs on every event loop step, including all awaits in external/irrelevant code. Needless to say, it grinds everything to a halt.

I chose to implement the algorithm by taking these 2 into account. I only wake up a single waiter when there's an available spot. I do not deal with different acquire sizes as I feel like it is seldom used, and it turns the algorithm more complex. Even so, supporting other sizes should be a breeze as a I chose to use a deque and all you need is to accommodate a (size, waiter) tuple. It even has the nice benefit of guaranteeing order and not stagnating large acquires.

I think adding a semaphore limits the bashing to a 100-bucketsize requests, and that's why it runs a little more smoothly although quite inaccurately according to server timestamps.

Using my implementation, there is no issue even on 10,000 (without debug. It still works quite well on debug but I believe the IO to the screen is the major slowdown).

New client.py:

from ast import Await
import asyncio
import logging
import math
from typing import AsyncContextManager, Awaitable, Callable, Union

from aiolimiter import AsyncLimiter
from aioratelimiter import LeakyBucketLimiter
from aiohttp import ClientSession
from httpx import AsyncClient

logger = logging.getLogger("main")

class TaskNameFilter(logging.Filter):
    def filter(self, record):
        try:
            task = asyncio.current_task()
        except RuntimeError:
            task = None
        record.taskname = task.get_name() if task is not None else "~"
        return True

class anullcontext:
    async def __aenter__(self):
        return None

    async def __aexit__(self, *excinfo):
        return None

async def task(limit_func: Callable[[], Awaitable[None]],
        sema: AsyncContextManager, client: AsyncClient):
    async with sema:
        logger.debug(">> sema")
        await limit_func()
        logger.debug(">> limiter")
        await client.get("http://localhost:8000")
        logger.info("request made")
    logger.debug("<< sema")

async def main(
    num_requests: int,
    use_sema: bool,
    rate: float = 20,
    period: float = 1,
    aiolimiter: bool = False,
    client_factory: type[Union[AsyncClient, ClientSession]] = AsyncClient,
):
    if aiolimiter:
        limit_func = LeakyBucketLimiter(rate/period, capacity=rate).wait
    else:
        limit_func = AsyncLimiter(rate, period).acquire

    sema = asyncio.BoundedSemaphore(100) if use_sema else anullcontext()
    client = client_factory()
    width = 1 + int(math.log10(num_requests))
    tasks = [
        asyncio.create_task(task(limit_func, sema, client), name=f"#r{x:0{width}d}")
        for x in range(num_requests)
    ]

    await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-s", dest="use_sema", action="store_true", help="Use a bounded semaphore"
    )
    parser.add_argument("-u", dest="use_uvloop", action="store_true", help="Use uvloop")
    parser.add_argument(
        "-a",
        dest="use_aiohttp",
        action="store_true",
        help="Use aiohttp instead of httpx",
    )
    parser.add_argument(
        "-d",
        dest="debug",
        action="store_true",
        help="Enable asyncio logging and debugging",
    )
    parser.add_argument(
        "-r",
        dest="rate",
        type=float,
        default=20,
        help="Maximum number of requests in a time period",
    )
    parser.add_argument(
        "-p", dest="period", type=float, default=1, help="Rate limit time period"
    )
    parser.add_argument(
        "-rl", dest="aioratelimiter", action="store_true",
        help="Use aioratelimiter.")
    parser.add_argument("num_requests", nargs="?", default=50, type=int)
    args = parser.parse_args()

    factory = ClientSession if args.use_aiohttp else AsyncClient

    if args.use_uvloop:
        import uvloop

        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

    logging.basicConfig(
        level=logging.DEBUG if args.debug else logging.INFO,
        format="(%(relativeCreated)06d %(taskname)s) %(levelname)s:%(name)s:%(message)s",
    )
    filter = TaskNameFilter()
    for handler in logging.getLogger().handlers:
        handler.addFilter(filter)
    logging.info("Running test with %r", args)

    asyncio.run(
        main(
            args.num_requests,
            args.use_sema,
            args.rate,
            args.period,
            client_factory=factory,
            aiolimiter=args.aioratelimiter,
        ),
        debug=args.debug,
    )

Use -rl to activate my rate limiter.

I haven't packaged it yet, or dealt with documentation, as I finished it only today, but here's a link to download the single module.

bharel commented 2 years ago

While I package mine, do you want me to submit a PR for yours fixing those 2 issues? It will require a little overhaul of your design, but not too bad. I promise it'll still be backwards compatible :-)

If we're already at it, we can fix the "max-rate" and "time period" to play nicely if changed using a property that updates the calls/second. I chose to use "rate" for calls/second and "capacity" for bucket/burst capacity, and I just recalculate the reciprocal so others can modify them at will.

Thank you for teaching me about the leaky bucket algorithm btw! My original invention is quite close and bursts only when necessary, but I added the "by-the-book" solution for completion sake, and a more strict version as well.

mjpieters commented 2 years ago

I'm certainly interested in perf improvements; I've been pondering the waiters structure on and off here. Unfortunately, as you can tell from the long delay in response here, I'm quite busy at the moment; even reviews will take some time to trickle through.

So, if you want to make a PR, go right ahead, I just can't make any promises as to when I can do the review 😅

mjpieters commented 1 year ago

@bharel: revisiting this, I don't think your assessments quite hold:

  1. You check the time and _leak on every call. That causes an unnecessary overhead as on 100% of the cases there won't be a leak if you schedule 1k requests at once.

I don't see this as an issue, this is hardly a high overhead. loop.time() retrieves the current time value from a C call. At most, we could cache the result to avoid having to call it twice in _leak(), but I doubt it'll make much difference.

  1. You're bashing the event loop checking for a free spot. Basically, all of your waiters awake every rate tick, making 1k new futures every tick for a single vacant spot. It's an enormous overhead, and with enough waiters, occurs on every event loop step, including all awaits in external/irrelevant code. Needless to say, it grinds everything to a halt.

This is the clincher, I think. I indeed use wait_for() with a timeout for every blocked task, and that is almost certainly the core issue here.. So if we can replace that with a single timed wait we'd be in a much better position. I think I can see how that'd work, we can cancel and reschedule that future if the amount value would put the current estimated time of future capacity way out.

I might even borrow your deque idea there. Currently, I use a dict and rely on that retaining insertion order. Iteration to look for a still-waiting future is fast. I fear however that the many deletions cause us some re-sizing pain, pain that a deque would avoid.

I'll see about experimenting with this.