cenkalti / kuyruk

⚙️ Simple task queue for Python
https://kuyruk.readthedocs.org/
MIT License
231 stars 17 forks source link

Auto-retry failing tasks with rescheduling #58

Closed frol closed 6 years ago

frol commented 6 years ago

As a follow-up to #50, I want to share the code snippet I ended up using to implement infinite auto-retry for the crashing tasks with rescheduling the tasks back through RabbitMQ queue, which may help if the task will just get scheduled to a non-failing worker. I could not find a way to implement non-infinite auto-retry with rescheduling using RabbitMQ since you cannot update the existing task with some meta information (e.g. a number of failing attempts), but using some external service (e.g. Redis or Memcached), you can modify this code to implement any retry logic you want, so I leave it as an exercise to the reader to adapt the code snippet.

import functools
import logging

from kuyruk import Kuyruk as BaseKuyruk, Task

log = logging.getLogger(__name__)

def safe_kuyruk_task(func, retry_delay):
    """
    Args:
        func (callable): the wrapped function task.
        retry_delay (float): time in seconds between retries on crash.
    """
    @functools.wraps(func)
    def decorator(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except (Kuyruk.Discard, Kuyruk.Reject):
            raise
        except Exception:
            log.exception("Task has crashed... Retrying in %g seconds.", retry_delay)
            raise Kuyruk.Reject()

    return decorator

class Kuyruk(BaseKuyruk):

    def task(self, queue='kuyruk', retry_delay=60, **kwargs):
        """
        Wrap functions with this decorator to convert them to *tasks*.
        After wrapping, calling the function will send a message to
        a queue instead of running the function.

        Args:
            queue: Queue name for the tasks.
            retry_delay (float): time in seconds between retries on crash.
            kwargs: Keyword arguments will be passed to :class:`~kuyruk.Task`
                constructor.

        Returns:
            function: Callable :class:`~kuyruk.Task` object wrapping the
            original function.
        """
        def inner(func):
            return Task(
                    safe_kuyruk_task(func, retry_delay),
                    self,
                    queue,
                    reject_delay=retry_delay,
                    **kwargs)

        return inner

kuyruk = Kuyruk()

I had to override inner implementation of the Kuyruk class to pass my safe_kuyruk_task adapter, which enables the auto-retry logic for all the tasks automatically (thus, no changes to the existing tasks codebase is needed except this change of Kuyruk initialization).

@cenkalti I couldn't find a good way to generalize this thing, so I won't send a PR, but I decided to share my code since other may wonder how to do that. It would be great to see some upstream solution to this problem either in form of some built-in implementation or just as an example/documentation somewhere.

cenkalti commented 6 years ago

Hello @frol

This won't work because reject_delay argument is removed at version 9.

https://github.com/cenkalti/kuyruk/blob/0f3212a6e653da621f664979d9d3846d93867000/docs/changelog.rst#L13

If you need that custom behavior I recommend implementing outside of Kuyruk.

frol commented 6 years ago

@cenkalti What was the reasoning behind removing the existing functionality? Didn't it work?

cenkalti commented 6 years ago

There were exceptions coming from that part of code and it was too complex to understand and debug.