litl / backoff

Python library providing function decorators for configurable backoff and retry
MIT License
2.57k stars 148 forks source link

Allow to use Retry-After header in asynchronous code #152

Open paulschmeida opened 2 years ago

paulschmeida commented 2 years ago

Follow up to #124 In its current form, backoff doesn't allow to use Retry-After headers in asynchronous code. It might be my lack of understanding or a problem with aiohttp, but I'm curious what your opinions are.


import json
import asyncio
import aiohttp
from aiolimiter import AsyncLimiter
import backoff

def is_rate_limited(response):
    return response.status == 429

def wait_time_from_response(response):
    return float(response.headers['Retry-After'])

@backoff.on_predicate(backoff.runtime, predicate=is_rate_limited, value=wait_time_from_response, jitter=backoff.random_jitter, max_tries=3)
async def fetch(session):
    async with limiter, session.get(
        r"APP_ENDPOINT_URL"
    ) as response:
        return await response 

async def main():
    tasks = []
    async with aiohttp.ClientSession(
        auth=aiohttp.BasicAuth("USERNAME", "PASSWORD"),
        connector=aiohttp.TCPConnector(ssl=False, limit=200),
        json_serialize=json.dumps,
        raise_for_status=False
    ) as session:
        for _ in range(2000):
            task = asyncio.create_task(fetch(session))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

I need the full response object to be able to extract Retry-After header out of it for the value, but I can't await it because it's not awaitable. I can await response.read() in my fetch() function, but then I can't pass anything that contains headers to my predicate and value functions.

Is there any way to circumvent that and use backoff.runtime with Retry-After with aiohttp async code?

paulschmeida commented 2 years ago

I've managed to get it to work by using on_exception. I still believe there's no way to make it work with on_predicate. Feel free to close the issue, but I'm leaving it open, cause I'm still interested in opinion of the maintainers.

My working proof of concept:

def wait_time_from_exc(exc):
    print(exc.headers)
    if exc.status in [429, 503]:
        return float(exc.headers["Retry-After"])
    elif exc.status == 504:
        return 60
    else:
        return 120

async def fetch(session):
    async with limiter, session.get(
        r"API_ENDPOINT_URL"
    ) as response:
        statuses.append(response.status)
        print(response.status)
        return await response.read()

@backoff.on_exception(
    backoff.runtime,
    aiohttp.client_exceptions.ClientResponseError,
    value=wait_time_from_exc,
    jitter=backoff.random_jitter,
    max_tries=3,
)  # remove max tries or handle giveup
async def main():
    tasks = []
    ssl_context = ssl.create_default_context(cafile=certifi.where())
    async with aiohttp.ClientSession(
        auth=aiohttp.BasicAuth("USERNAME", "PASSWORD"),
        connector=aiohttp.TCPConnector(ssl=ssl_context, limit=100),
        json_serialize=json.dumps,
        raise_for_status=True,
    ) as session:
        for _ in range(300):
            task = asyncio.create_task(fetch(session))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
        return responses

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
bgreen-litl commented 2 years ago

Thanks for this report. I think we should figure out how to support this use case with async for all decorators if we can.