celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.87k stars 928 forks source link

SQS backoff_policy no longer works #1824

Open kevindixon opened 10 months ago

kevindixon commented 10 months ago

Versions:

Using acks_late

Setting backoff_policy fails with a KeyError when a message fails:

Traceback (most recent call last):
  File \"/venv/lib/python3.8/site-packages/celery/worker/worker.py\", line 202, in start
    self.blueprint.start(self)
  File \"/venv/lib/python3.8/site-packages/celery/bootsteps.py\", line 116, in start
    step.start(parent)
  File \"/venv/lib/python3.8/site-packages/celery/bootsteps.py\", line 365, in start
    return self.obj.start()
  File \"/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py\", line 340, in start
    blueprint.start(self)
  File \"/venv/lib/python3.8/site-packages/celery/bootsteps.py\", line 116, in start
    step.start(parent)
  File \"/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py\", line 742, in start
    c.loop(*c.loop_args())
  File \"/venv/lib/python3.8/site-packages/celery/worker/loops.py\", line 97, in asynloop
    next(loop)
  File \"/venv/lib/python3.8/site-packages/kombu/asynchronous/hub.py\", line 306, in create_loop
    item()
  File \"/venv/lib/python3.8/site-packages/vine/promises.py\", line 160, in __call__
    return self.throw()
  File \"/venv/lib/python3.8/site-packages/vine/promises.py\", line 157, in __call__
    retval = fun(*final_args, **final_kwargs)
  File \"/venv/lib/python3.8/site-packages/kombu/message.py\", line 142, in reject_log_error
    self.reject(requeue=requeue)
  File \"/venv/lib/python3.8/site-packages/kombu/message.py\", line 164, in reject
    self.channel.basic_reject(self.delivery_tag, requeue=requeue)
  File \"/venv/lib/python3.8/site-packages/kombu/transport/virtual/base.py\", line 680, in basic_reject
    self.qos.reject(delivery_tag, requeue=requeue)
  File \"/venv/lib/python3.8/site-packages/kombu/transport/SQS.py\", line 188, in reject
    self.apply_backoff_policy(
  File \"/venv/lib/python3.8/site-packages/kombu/transport/SQS.py\", line 208, in apply_backoff_policy
    self.extract_task_name_and_number_of_retries(delivery_tag)
  File \"/venv/lib/python3.8/site-packages/kombu/transport/SQS.py\", line 225, in extract_task_name_and_number_of_retries
    message.properties['delivery_info']['sqs_message']
KeyError: 'Attributes'

From what I can see of the Kombu SQS implementation calls to boto3 receive_message and receive_messages don't request any attributes (no AttributeNames parameter). At least in my experiments, receive_message and receive_messages don't return Attributes unless specifically requested to do so.

Not sure if this is a change in Kombu or boto

kevindixon commented 10 months ago

From what I can see of the Kombu SQS implementation calls to boto3 receive_message and receive_messages don't request any attributes (no AttributeNames parameter). At least in my experiments, receive_message and receive_messages don't return Attributes unless specifically requested to do so.

Looked again, and this assertion isn't true - I've misread the code. asynchronous.aws.sqs.connection does indeed set attribute names.

Not entirely sure why I'm getting this error (only when setting backoff_policy)..

auvipy commented 10 months ago

can you help to reproduce with celery 5.3.6?

kevindixon commented 10 months ago

Same behaviour with Celery 5.3.6.

However, I think this is my misunderstanding of the documentation - backoff_policy does not work when raising Reject with requeue=True. This stands to reason, I guess, because requeue=True presumably creates a new SQS message.

So, my solution at least is to NOT use requeue=True and backoff_policy is honoured.

The Celery SQS documentation is pretty unclear on all this (as is the documentation of the requeue parameter), and the KeyError isn't particularly helpful. So, maybe a little doc improvement.

More appropriate behaviour, I would think, would be to ignore backoff_policy when Reject is thrown with requeue=True and log the fact the 'policy has been ignored?