celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.81k stars 918 forks source link

SQS doesn't acknowledge task when it is rejected (like Celery docs say should happen). #1761

Open jfalkenstein opened 1 year ago

jfalkenstein commented 1 year ago

The Celery documentation says this:

The task may raise Reject to reject the task message using AMQPs basic_reject method. This won’t have any effect unless Task.acks_late is enabled.

Rejecting a message has the same effect as acking it, but some brokers may implement additional functionality that can be used. For example RabbitMQ supports the concept of Dead Letter Exchanges where a queue can be configured to use a dead letter exchange that rejected messages are redelivered to.

However, if task_acks_late is set to True and task.reject() is invoked, the SQS Transport doesn't actually remove the task from the queue, like the Celery docs indicate should happen. Instead, the SQS Channel class doesn't even implement a reject method, so the base class calls the QoS's reject() method, which does nothing if there's no backoff policy configured on the queue. If you look at the base virtual QoS class, it seems the intent of the reject method is that there should be a call to ack the message when it's invoked. This seems like a mistake or otherwise a problematic design choice not to follow that lead in the SQS QoS.

This issue took me FOREVER to track down, spelunking all through Celery and Kombu code to finally figure this out. To get around this, I had to implement a custom Request class and annotate all my tasks with it:

class TaskRequest(Request):
    def reject(self, requeue=False):
        task: Task = self.task
        if task.acks_late and not task.acks_on_failure_or_timeout and not self.acknowledged:
            self.message.channel.basic_ack(self.message.delivery_tag)
        super().reject(requeue)

I think that, to be consistent with Celery docs, the SQS Channel class should implement a reject method that acks the message. Or, if we want to keep the retry-policy logic, we should ack the message by default if there's no backoff policy set.

auvipy commented 1 year ago

@rafidka I will highly appreciate if you can review some related issues and share some feedback with US. no push though.

amegianeg commented 8 months ago

@jfalkenstein were you able to send tasks to the dead-letter queue using SQS? I've tried to send them every time a task fails or I reject explicitly the task (raising the Reject exception) but no luck.

jfalkenstein commented 8 months ago

Hey @amegianeg Great question! To get the behavior I wanted (and what I believe is what the Celery docs describe as the meaning of "rejection"), I had to create a new "Request" class and set that in the Celery task_annotations configuration for the "Request" key.

Here is the "hack" and it has worked really well for us.

class TaskRequest(Request):
    is_error = False

    def on_failure(self, exc_info: ExceptionInfo, send_failed_event=True, return_ok=False):
        """Augments normal failure handling of task failures to toggle an instance flag so that
        we can know in the `reject` method whether we're currently handling an error.
        """
        current_exception = exc_info.exception
        # TaskPredicates are special Celery exceptions that dictate different sorts of handling,
        # such as "Reject", "Retry", and "Ignore". These aren't actually "errors" and as such we
        # want to avoid handling them like this, specifically if they end up invoking "reject".
        if not isinstance(current_exception, TaskPredicate):
            self.is_error = True

        return super().on_failure(exc_info, send_failed_event, return_ok)

    def reject(self, requeue=False):
        """Augments the usual rejection behavior for tasks to additionally acknowledge the message
        when rejecting.

        This is necessary because, despite the Celery docs indicating that rejecting a task will
        acknowledge it, the SQS Celery Transport doesn't actually do this. Instead, it really
        doesn't do anything except for potentially hide the message longer and apply visibility
        timeout but only if you've explicitly configured visibility timeouts on a per-queue basis
        in the transport options (which we don't do).

        This is a problem for us because of the combination of...
        * This Celery quirk where the Celery transport won't delete the message upon rejection
        * task_acks_late=True (where Celery won't delete the message off the queue until AFTER the
            task has been run--the default on this is False but we set it to True so we can leverage
            SQS retries and DLQs when there are failures).

        When these two things combine, it means that rejected tasks stay hidden on the SQS queue,
        which is exactly what we're rejecting the task to PREVENT.

        However, we only acknowledge the task message in this situation IF we are not handling an
        error. Right now, the base class's on_failure method will reject unhandled exceptions. In
        these cases, we want to allow SQS to retry these rejections.
        """
        task: Task = self.task

        if task.acks_late and not self.acknowledged and not self.is_error:
            self.message.channel.basic_ack(self.message.delivery_tag)

        super().reject(requeue)

Ideally, this issue would be handled deeper in the Request class or even on the transport itself. But this has worked great for us. Now, if we run self.reject(), the task is acked and thus deleted from the queue; But if an error blows up the task, it WON'T be acked and SQS will redrive it.

amegianeg commented 8 months ago

@jfalkenstein Thank you so much for showing the solution you came up with. I've tested it in my local environment (using localstack) before and after applying your patch:

Thanks again! I owe you a beer! 👏

jfalkenstein commented 8 months ago

Hey @amegianeg, just to be clear, this solution has these effects when combined with the "acks_late" setting:

a rejected task will be removed from the queue and moved to the dead-letter queue

My example won't do this via an SQS Redrive policy. Now, you might have some other configured behavior within your Celery app via routing, etc... but I just wanted to make clear that my example explicitly DOES ack the task and DOESN'T move the task to the DLQ.

amegianeg commented 8 months ago

@jfalkenstein thanks for noticing that, I must have done something weird while I was testing it. I have just tested it again to make sure and like you said, it DOES NOT move the task to the DLQ.

Thanks again! 🙌

auvipy commented 7 months ago

can you guys check/try/review this PR https://github.com/celery/kombu/pull/1807?

jlucas91 commented 1 month ago

Hi @auvipy we believe we're still seeing this issue on the latest Kombu version. Specifically, when using SQS and acks_late unknown tasks loop indefinitely through SQS. Our belief is the unknown task is being rejected, but the rejection does not propogate an ack to the queue.

Is there any chance this is still on y'alls roadmap?