Open seidnerj opened 1 year ago
I have not found a solution so far, so I made my own decorator. Before contributing to tenacity or my fork, for now I offer this solution (it can be optimized for your tasks):
import logging
import functools
from logging import Logger, getLogger
from typing import Callable, Coroutine, Type
class RetryError(Exception):
...
def retry(
after_callback: Callable[..., Coroutine] | None = None,
logger: Logger | None = None,
max_retries: int = 5,
exception_for_retry: list[Type[Exception]] | Type[Exception] = Exception,
reraise: bool = False,
):
def func_wrapper(f):
@functools.wraps(f)
async def wrapper(*args, **kwargs):
last_exc = None
for attempt_num in range(max_retries):
try:
return await f(*args, **kwargs)
except (
tuple(exception_for_retry)
if isinstance(exception_for_retry, list)
else exception_for_retry
) as exc:
last_exc = exc
if after_callback:
await after_callback()
if logger:
logger.info(msg=f"Trigger retrying. Attempt #{attempt_num+1}")
raise (last_exc if reraise else RetryError)
return wrapper
return func_wrapper
That’s awesome, thanks a lot!
I have the following code:
@retry(retry=retry_if_exception_type(SomeException), stop=stop_after_attempt(5), before_sleep=before_sleep_log(logging.getLogger(), logging.ERROR), wait=wait_exponential(multiplier=1, min=1, max=10), before=before_retry_handler) async def async_wrapper(param): ....
async def before_retry_handler(retry_state): ....
Trouble is before_retry_handler has to be async because it calls async functions.
When I try to use the above, I get "RuntimeWarning: coroutine 'before_retry_handler' was never awaited self.before(retry_state)".
Am I missing something?
Thanks in advance