celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.9k stars 930 forks source link

KeyError 'Attributes' in SQS _create_queue #2142

Open SebCorbin opened 1 month ago

SebCorbin commented 1 month ago

We've got and intermittent error from where a task is sent create_export_task.delay() :

  File "celery/app/trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
  File "exports/tasks/export.py", line 32, in create_export_task
    run_export_task.delay(task.uuid)
  File "celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 594, in apply_async
    return app.send_task(
  File "celery/app/base.py", line 798, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "celery/app/amqp.py", line 517, in send_task_message
    ret = producer.publish(
  File "kombu/messaging.py", line 186, in publish
    return _publish(
  File "kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
  File "kombu/messaging.py", line 202, in _publish
    [maybe_declare(entity) for entity in declare]
  File "kombu/messaging.py", line 202, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "kombu/messaging.py", line 107, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "kombu/common.py", line 113, in maybe_declare
    return _maybe_declare(entity, channel)
  File "kombu/common.py", line 153, in _maybe_declare
    entity.declare(channel=channel)
  File "kombu/entity.py", line 617, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "kombu/entity.py", line 626, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "kombu/entity.py", line 655, in queue_declare
    ret = channel.queue_declare(
  File "kombu/transport/virtual/base.py", line 538, in queue_declare
    return queue_declare_ok_t(queue, self._size(queue), 0)
  File "kombu/transport/SQS.py", line 660, in _size
    return int(resp['Attributes']['ApproximateNumberOfMessages'])
KeyError: 'Attributes'

Versions

kombu==5.3.7
celery==5.3.4
amqp==5.2.0
boto3==1.34.128
botocore==1.34.128

Here's the payload returned by c.get_queue_attributes()

{
  ResponseMetadata: {
    HTTPHeaders: {
      connection: "keep-alive",
      content-length: "0",
      content-type: "application/x-amz-json-1.0",
      date: "Sat, 05 Oct 2024 22:00:00 GMT",
      x-amzn-requestid: "..."
    },
    HTTPStatusCode: 200,
    RequestId: "...",
    RetryAttempts: 0
  }
}

The queue is already created before that, another thing is that create_export_task is itself a task (which calls run_export_task) but I don't know if that's relevant, as the error is intermittent

adiberk commented 1 month ago

Hitting this also