litl / backoff

Python library providing function decorators for configurable backoff and retry
MIT License
2.6k stars 148 forks source link

Support for async iterators #131

Open gordonwatts opened 3 years ago

gordonwatts commented 3 years ago

I needed to restart an async iterator if it hadn't returned any items yet. The algorithm I needed was:

The logic here is basically once the async iterator has returned an item there is no way to "get it back" - so the exception should follow the same logic.

I used backoff in my project already, so I thought I'd just re-use the core async code you have already written to implement this. I came up with the code below. As you can tell, it is about 99% copied from this library. The only important difference comes in the retry_exception_itr function, in the retry function, in the while loop (so about 20 lines down from the top).

Which leads me to: is this a feature you'd be interested in having in this library? If so, does the below approach make sense?

def retry_exception_itr(target, wait_gen, exception,
                        max_tries, max_time, jitter, giveup,
                        on_success, on_backoff, on_giveup,
                        wait_gen_kwargs):
    on_success = _ensure_coroutines(on_success)
    on_backoff = _ensure_coroutines(on_backoff)
    on_giveup = _ensure_coroutines(on_giveup)
    giveup = _ensure_coroutine(giveup)

    # Easy to implement, please report if you need this.
    assert not asyncio.iscoroutinefunction(max_tries)
    assert not asyncio.iscoroutinefunction(jitter)

    @functools.wraps(target)
    async def retry(*args, **kwargs):
        # change names because python 2.x doesn't have nonlocal
        max_tries_ = _maybe_call(max_tries)
        max_time_ = _maybe_call(max_time)

        tries = 0
        start = datetime.datetime.now()
        wait = _init_wait_gen(wait_gen, wait_gen_kwargs)
        while True:
            tries += 1
            elapsed = timedelta.total_seconds(datetime.datetime.now() - start)
            details = (target, args, kwargs, tries, elapsed)

            got_one_item = False
            try:
                async for item in target(*args, **kwargs):
                    got_one_item = True
                    yield item
            except exception as e:
                # If we've already fed a result out of this method,
                # we can't pull it back. So don't try to pull back/retry
                # the exception either.
                if got_one_item:
                    raise

                giveup_result = await giveup(e)
                max_tries_exceeded = (tries == max_tries_)
                max_time_exceeded = (max_time_ is not None
                                     and elapsed >= max_time_)

                if giveup_result or max_tries_exceeded or max_time_exceeded:
                    await _call_handlers(on_giveup, *details)
                    raise

                try:
                    seconds = _next_wait(wait, jitter, elapsed, max_time_)
                except StopIteration:
                    await _call_handlers(on_giveup, *details)
                    raise e

                await _call_handlers(on_backoff, *details, wait=seconds)

                # Note: there is no convenient way to pass explicit event
                # loop to decorator, so here we assume that either default
                # thread event loop is set and correct (it mostly is
                # by default), or Python >= 3.5.3 or Python >= 3.6 is used
                # where loop.get_event_loop() in coroutine guaranteed to
                # return correct value.
                # See for details:
                #   <https://groups.google.com/forum/#!topic/python-tulip/yF9C-rFpiKk>
                #   <https://bugs.python.org/issue28613>
                await asyncio.sleep(seconds)
            else:
                await _call_handlers(on_success, *details)
                return
    return retry

def on_exception_itr(wait_gen,
                     exception,
                     max_tries=None,
                     max_time=None,
                     jitter=full_jitter,
                     giveup=lambda e: False,
                     on_success=None,
                     on_backoff=None,
                     on_giveup=None,
                     logger='backoff',
                     backoff_log_level=logging.INFO,
                     giveup_log_level=logging.ERROR,
                     **wait_gen_kwargs):
    def decorate(target):
        # change names because python 2.x doesn't have nonlocal
        logger_ = _prepare_logger(logger)

        on_success_ = _config_handlers(on_success)
        on_backoff_ = _config_handlers(
            on_backoff, _log_backoff, logger_, backoff_log_level
        )
        on_giveup_ = _config_handlers(
            on_giveup, _log_giveup, logger_, giveup_log_level
        )

        retry = None
        if sys.version_info[:2] >= (3, 5):   # pragma: python=3.5
            import asyncio

            if asyncio.iscoroutinefunction(target):
                import backoff._async
                retry = backoff._async.retry_exception

        if retry is None:
            retry = _sync.retry_exception

        return retry_exception_itr(target, wait_gen, exception,
                                   max_tries, max_time, jitter, giveup,
                                   on_success_, on_backoff_, on_giveup_,
                                   wait_gen_kwargs)

    # Return a function which decorates a target with a retry loop.
    return decorate

As always, thanks for a really excellent library!

awm33 commented 3 years ago

I think I'm having the same issue. I was trying to use async for ... with a function wrapped using backoff. The use case is handling websocket disconnects.

gordonwatts commented 3 years ago

@awm33 - Would the algorithm above have handled it? The problem with re-starting something when the sequence is halfway done is... well... difficult.

awm33 commented 3 years ago

@gordonwatts I used a wrapper within the wrapper. Here's a condensed version of what I was trying to do:

async def events():
    url = getws_url()
    req_kwargs = {}
    add_auth_header(req_kwargs)

    @backoff.on_exception(backoff.expo,
                              (aiohttp.ClientResponseError,
                               aiohttp.ClientConnectorError,
                               aiohttp.WebSocketError,
                               aiohttp.WSCloseCode,
                               DisconnectError))
     async def connect_wrapper():
         async with session.ws_connect(url, **req_kwargs) as ws:
             async for msg in ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        yield msg.json()
                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        break

              if not ws_disconnecting: ## tracks if the disconnect was intended
                    raise DisconnectError()

     async for event in connect_wrapper(): # there is no "async yield for ..."
         yield event

Then

async for event in self.events():
    print(event)
gordonwatts commented 3 years ago

Ok - so this looks like it is the same pattern I had - you need to catch and recover from errors that occurred in the ws_connect call. But once messages started returning from the session, then you didn't need to catch errors there.