Bogdanp / dramatiq

A fast and reliable background task processing library for Python 3.
https://dramatiq.io
GNU Lesser General Public License v3.0
4.27k stars 304 forks source link

How to mute exception errors for a task that will be retried? #469

Open mparent61 opened 2 years ago

mparent61 commented 2 years ago

What OS are you using?

Docker/Debian running on both Mac/Arm64 and Linux/x86

What version of Dramatiq are you using?

1.12.3

What did you do?

Tasks that raise retryable exceptions are still generating a logger.error message, which is firing alerts to our third-party monitoring solutions like Sentry.

We're trying to find a way to avoid triggering errors/alerts for exceptions that will trigger a task retry. That way we can lean on the Retries middleware to handle common errors like "3rd party API connection errors", while allowing us to avoid triggering alerts via our error handling frameworks (ex: Sentry).

What did you expect would happen?

We'd like to be able to mute exception error messages for exception types that will trigger a task retry.

For example, inside our Sentry "should we send alert" log handler, we could determine if the raised exception was for a task that would be retried, and we could mute the alert.

What happened?

The worker.py process_message() logic calls logger.error right before passing off to the Retries after_process_message handler, where all of the "will retry" logic is located.

https://github.com/Bogdanp/dramatiq/blob/f86b55167ff750756f632d022da32f89ba3e44da/dramatiq/worker.py#L512-L514

This means that any downstream logger/error handlers have no easy way to determine if the error/task will be scheduled for retry, and decide to ignore/mute the error.

Possible solutions

I've looked over the Dramatiq source code and can't see an easy way to accomplish this, so I'm hoping for suggestions.

I have come up with a few ideas though -

  1. Change Worker.process_message() to not call logger.error when a message will be retried, or perhaps downgrade to an info or warning level.

  2. Set a message.will_retry = True property before calling logger.error, so downstream error handlers can check CurrentMessage.get_current_message().will_retry to decide how to handle.

  3. Provide a method in the Retries middleware to compute whether a particular message will be retried, hopefully sharing logic with Retries.after_process_message (which would simplify my workaround below).

  4. My current workaround - don't change Dramatiq at all, but instead introspect the message using logic lifted directly from Retries.after_process_message(). This would be called inside a logger handler triggered by the problematic logger.error call.

def will_retry_message(message):
    # Determine if message will be retried by `Retries` middleware
    # This logic adapted from Dramatiq v1.12.3's `Retries` logic.
    if exception := message._exception:
        actor = broker.get_actor(message.actor_name)
        retries = message.options.get("retries", 0)
        max_retries = message.options.get("max_retries") or actor.options.get(
            "max_retries"
        )
        retry_when = actor.options.get("retry_when")
        if (retry_when is not None and retry_when(retries, exception)) or (
            retry_when is None and max_retries is not None and retries < max_retries
        ):
            # Task will be retried, don't fire Sentry alert
            return True

    return False

This is a sample retryable task I'm using to test solutions. Ideally the first error would not trigger any error/alerts, and while the 2nd time through would trigger errors/alerts (since max_retries has been exceeded).

@dramatiq.actor(max_retries=1)
def error():
    raise Exception("Task error!")
santigandolfo commented 1 year ago

Hi @mparent61! I'm running into the same problem on a similar usecase (NewRelic instead of Sentry). Does solution 4 work? Where should the function be called? (you mentioned the logger handler, but my small brain can't seem to figure out how that would be done). Cheers!

mparent61 commented 1 year ago

Hi @santigandolfo, I ultimately ended up adding automatic retries inside our tasks (via urllib3.util.retry.Retry adaptor for 3rd-party APIs), and ended up not using the workaround above, as this simple approach avoided the workaround complexity.

I've forgotten specifics, but believe I implemented solution 4 inside a custom Dramatiq middleware that wrapped the actual task function call and checked for a retry in an exception handler.

We still occasionally see these unnecessary about-to-be-retried error alerts, so I still think the problem is worth solving.

ccurvey commented 1 year ago

can this be solved by making your retryable exceptions inherit from dramatiq.errors.Retry?

mparent61 commented 1 year ago

Yeah, that's a good suggestion @ccurvey. Would require catching any exceptions that did not derive from Retry (ex: third-party library errors), and re-raising some sort of Retry-derived exception, but that would at least make it clear which errors are "retryable" (or some decorator/middleware to streamline this).

jurrian commented 4 months ago

I the case of Sentry with requests.exceptions.RetryError, I ended up ignoring it like this:


from requests.exceptions import RetryError

sentry_sdk.init(
    ...
    ignore_errors=[RetryError],
)
spumer commented 4 months ago

we have RetryActor for this


from dramatiq.errors import Retry

class RetryActor(Actor):
    """Extends :class:`Retries`, converting the exception into a retriable one at the task execution stage
     This allows you to avoid unnecessary error records due to the specifics of error handling in dramatiq

    >>> import tochka_dramatiq
    >>> from tochka_dramatiq.contrib.middleware.retry_fail_policy import RetryActor
    >>> retry_actor = functools.partial(tochka_dramatiq.actor, actor_class=RetryActor)
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # If you do not specify throws, then Dramatiq will log the exception with the ERROR level
        # And this will happen before any of the "middleware" is called
        self.options['throws'] = (self.options.get('throws') or ()) + (TemporaryProblemError,)

    def __call__(self, *args, **kwargs):
        retry_problem_matcher = self.options.get('retry_problem_matcher')
        if retry_problem_matcher is None:
            retry_problem_matcher = temporary_problem_matcher

        if isinstance(retry_problem_matcher, TemporaryProblemMatcher):
            problem_exc = TemporaryProblemError
        else:
            problem_exc = Retry

        with problem_detector(problem_exc, retry_problem_matcher):
            return super().__call__(*args, **kwargs)