celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.88k stars 928 forks source link

Support message sizes > 256KB by using a third-party backend... #279

Open diranged opened 10 years ago

diranged commented 10 years ago

SQS only supports messages up to 256KB. Given that limitation, its very easy to hit the limit and fail your task submission. Here's a simple example:

group(benchmark_tasks.WaitTask().s(args=[0, data]) for i in xrange(1024)).apply_async() Traceback (most recent call last): File "", line 1, in File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/canvas.py", line 194, in apply_async return self._apply_async(args, kwargs, _options) File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/app/builtins.py", line 213, in apply_async list(tasks), result.serializable(), gid, args), _options File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/app/task.py", line 474, in apply_async _options) File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/app/amqp.py", line 250, in publish_task _kwargs File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/messaging.py", line 164, in publish routing_key, mandatory, immediate, exchange, declare) File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/connection.py", line 453, in _ensured return fun(_args, _kwargs) File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/messaging.py", line 180, in _publish mandatory=mandatory, immediate=immediate, File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/transport/virtual/init.py", line 469, in basic_publish exchange, routing_key, _kwargs) File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/transport/virtual/exchange.py", line 61, in deliver _put(queue, message, *_kwargs) File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/transport/SQS.py", line 232, in _put q.write(m) File "/opt/nextdoor-ve/lib/python2.7/site-packages/boto/sqs/queue.py", line 220, in write delay_seconds) File "/opt/nextdoor-ve/lib/python2.7/site-packages/boto/sqs/connection.py", line 253, in send_message queue.id, verb='POST') File "/opt/nextdoor-ve/lib/python2.7/site-packages/boto/connection.py", line 1062, in get_object raise self.ResponseError(response.status, response.reason, body) SQSError: SQSError: 400 Bad Request <?xml version="1.0"?>SenderInvalidParameterValueValue for parameter MessageBody is invalid. Reason: Message body must be shorter than 262144 bytes.a34e4e32-406a-5652-b527-0537f7db8afe

There are two ways to fix this that I see.

  1. Fix only the group() command by having it break up groups automatically into smaller chunks any time the message-size limitation is reached. This is probably simple to do in the code for Kombu.
  2. Create an alternative data backend (memcache, redis, or better yet ... dynamodb) where messages > 256KB are stored and a pointer is used to let Kombu know where to go and get its raw message data from.
diwu1989 commented 10 years ago

option 1 seems much better than option 2, because option 2 adds another piece of dependency and a place where things can break.

ask commented 10 years ago

Not sure how option 1 would work, as there is no safe place to put the knife. If it needs to divide an object then to recover the object at the other end, all of the pieces must be received by the same process, and then what happens if one or more of the pieces are lost? or what if the process dies while processing them? May be more places where things can break in option 1. Option 2 would have to be optional, and a user can always choose to store big objects somewhere else and pass URLs manually.

shuklaabhi commented 4 years ago

Will you be accepting pull requests for it, option-2?

auvipy commented 4 years ago

we will review the PR ofcourse. please come with proper unit and integrations tests :) and mention me

jmsmkn commented 3 years ago

I don't know if this helps anyone who is thinking of adding this but there is a python implementation of the option 2 pattern using S3 for storage that is influenced by the AWS Java Extended client for SQS: https://github.com/archetype-digital/aws-sqs-extended. This extends boto3 with extra calls that could be used as a basis for a transport (I think). I've not reviewed the code in detail but it is tested under Python 3.7, 3.8 and 3.9, 99% test coverage and MIT licensed.

I'm a long time user of Celery+SQS but don't know my way around the internals, but I'd really be interested in a solution to this issue and would be happy to help out where I can.

auvipy commented 3 years ago

I found this article very useful https://walid.dev/blog/saving-costs-asking-for-forgiveness-in-python/

auvipy commented 3 years ago

I don't know if this helps anyone who is thinking of adding this but there is a python implementation of the option 2 pattern using S3 for storage that is influenced by the AWS Java Extended client for SQS: https://github.com/archetype-digital/aws-sqs-extended. This extends boto3 with extra calls that could be used as a basis for a transport (I think). I've not reviewed the code in detail but it is tested under Python 3.7, 3.8 and 3.9, 99% test coverage and MIT licensed.

I'm a long time user of Celery+SQS but don't know my way around the internals, but I'd really be interested in a solution to this issue and would be happy to help out where I can.

this could be useful some use cases

jmsmkn commented 1 year ago

~AWS have now released the extended client library for Python, allowing up to 2GB messages on SNS via S3:~

auvipy commented 1 year ago

based on new library can we close this or we need to integrate and ensure it is supported in kombu?

jmsmkn commented 1 year ago

Ah, wait, sorry, the new lib is for addressing the same problem for SNS and not SQS, so not helpful here. Apologies.

terrykfwong commented 7 months ago

AWS have now released the extended client library for Python, allowing up to 2GB messages on SQS via S3:

Announcement GitHub Repo

Amwam commented 1 month ago

I've made a start on attempting to integrate the sqs_extended_client. You can view that here: https://github.com/celery/kombu/compare/main...Amwam:kombu:add-sqs-large-payload-support?expand=1

While this appears to work, I'm not sure if there are issues in the implementation. There are also features missing, such as automatically deleting the payload, after the task has been completed.

The core issue I've run into is that way kombu fetches messages from SQS is via the HTTP API, rather than using boto3, so the extended client isn't used when retrieving messages, only for publishing. Another PR references a desire to convert to using boto3 for calls, but this seems like a bigger refactoring is required to make this happen in a performant way. As a result, decoding the SQS message requires some manual handling to mimic how the sqs_extended_client behaves.

auvipy commented 4 weeks ago

I've made a start on attempting to integrate the sqs_extended_client. You can view that here: https://github.com/celery/kombu/compare/main...Amwam:kombu:add-sqs-large-payload-support?expand=1

While this appears to work, I'm not sure if there are issues in the implementation. There are also features missing, such as automatically deleting the payload, after the task has been completed.

The core issue I've run into is that way kombu fetches messages from SQS is via the HTTP API, rather than using boto3, so the extended client isn't used when retrieving messages, only for publishing. Another PR references a desire to convert to using boto3 for calls, but this seems like a bigger refactoring is required to make this happen in a performant way. As a result, decoding the SQS message requires some manual handling to mimic how the sqs_extended_client behaves.

good job on starting work on it.