ApeWorX / taskiq-sqs

AWS SQS Broker for TaskIQ
Apache License 2.0
4 stars 1 forks source link

botocore.exceptions.ClientError: An error occurred (ExpiredToken) when calling the ReceiveMessage operation: The security token included in the request is expired #11

Closed mikeshultz closed 3 weeks ago

mikeshultz commented 3 weeks ago

Seen on a running worker earlier today. The worker crashed as this was unhandled.

Shutting down the broker.
  + Exception Group Traceback (most recent call last):
  |   File \"/usr/local/bin/silverback\", line 8, in <module>
  |     sys.exit(cli())
  |              ^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/click/core.py\", line 1157, in __call__
  |     return self.main(*args, **kwargs)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/click/core.py\", line 1078, in main
  |     rv = self.invoke(ctx)
  |          ^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/click/core.py\", line 1688, in invoke
  |     return _process_result(sub_ctx.command.invoke(sub_ctx))
  |                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/ape/cli/commands.py\", line 98, in invoke
  |     return self._invoke(ctx, provider=provider)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/ape/cli/commands.py\", line 135, in _invoke
  |     return ctx.invoke(self.callback or (lambda: None), **ctx.params)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/click/core.py\", line 783, in invoke
  |     return __callback(*args, **kwargs)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/click/decorators.py\", line 92, in new_func
  |     return ctx.invoke(f, obj, *args, **kwargs)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/click/core.py\", line 783, in invoke
  |     return __callback(*args, **kwargs)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/silverback/_cli.py\", line 129, in worker
  |     asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout))
  |   File \"/usr/local/lib/python3.11/asyncio/runners.py\", line 190, in run
  |     return runner.run(main)
  |            ^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/asyncio/runners.py\", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/asyncio/base_events.py\", line 654, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File \"/usr/local/lib/python3.11/site-packages/silverback/worker.py\", line 24, in run_worker
  |     await asyncio.gather(*tasks)
  |   File \"/usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py\", line 333, in listen
  |     async with anyio.create_task_group() as gr:
  |   File \"/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py\", line 680, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File \"/usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py\", line 361, in prefetcher
    |     message = await iterator.__anext__()
    |               ^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/taskiq_sqs/broker.py\", line 145, in listen
    |     for message in await asyncify(queue.receive_messages)(
    |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/asyncer/_main.py\", line 382, in wrapper
    |     return await run_sync(
    |            ^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/asyncer/_compat.py\", line 24, in run_sync
    |     return await anyio.to_thread.run_sync(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/anyio/to_thread.py\", line 56, in run_sync
    |     return await get_async_backend().run_sync_in_worker_thread(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py\", line 2177, in run_sync_in_worker_thread
    |     return await future
    |            ^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py\", line 859, in run
    |     result = context.run(func, *args)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/boto3/resources/factory.py\", line 581, in do_action
    |     response = action(self, *args, **kwargs)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/boto3/resources/action.py\", line 88, in __call__
    |     response = getattr(parent.meta.client, operation_name)(*args, **params)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/botocore/client.py\", line 565, in _api_call
    |     return self._make_api_call(operation_name, kwargs)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File \"/usr/local/lib/python3.11/site-packages/botocore/client.py\", line 1017, in _make_api_call
    |     raise error_class(parsed_response, operation_name)
    | botocore.exceptions.ClientError: An error occurred (ExpiredToken) when calling the ReceiveMessage operation: The security token included in the request is expired
    +------------------------------------"
mikeshultz commented 3 weeks ago

Found a comment from an AWS engineer that explains this isssue a bit in more detail. I suspect this is due to the recent code added for boto3 auth bypath, rather than a boto3 issue.

Presumably the credentials need to be refreshed occasionally from the metadata provider.