celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.89k stars 930 forks source link

Custom msgpack serializer ContentDisallowed #1065

Open ebeseda opened 5 years ago

ebeseda commented 5 years ago

Hi, I've created my custom msgpack serializer and deserializer for kombu and successfully registered it:

# serialization.py
MSGPACK_ENCODER_NAME = 'custommsgpack'

def encode(obj):
    if isinstance(obj, geometry.base.BaseGeometry):
        return {'__type__': '__shapely__', 'wkb': obj.wkb_hex}

    if isinstance(obj, geoelements.WKBElement):
        return {
            '__type__': '__wkbelement__',
            'data': obj.desc,
            'srid': obj.srid,
            'extended': obj.extented,
        }

    if isinstance(obj, geoelements.WKTElement):
        return {
            '__type__': '__wktelement__',
            'data': obj.data,
            'srid': obj.srid,
            'extended': obj.extented,
        }

    if isinstance(obj, (datetime, date)):
        if not isinstance(obj, datetime):
            obj = datetime(obj.year, obj.month, obj.day, 0, 0, 0, 0)

        date_iso = obj.isoformat()
        if date_iso.endswith("+00:00"):
            date_iso = date_iso[:-6] + "Z"
        return date_iso

    if isinstance(obj, time):
        return obj.isoformat()

    return obj

def decode_custom_type(obj):
    if obj.get('__type__') == '__shapely__':
        return wkb.loads(obj['wkb'], hex=True)

    if obj.get('__type__') == '__wkbelement__':
        return geoelements.WKBElement(
            binascii.unhexlify(obj['data']),
            obj['srid'],
            obj['extended']
        )

    if obj.get('__type__') == '__wktelement__':
        return geoelements.WKTElement(obj['data'], obj['srid'], obj['extended'])

    return obj

def decode(obj):
    if isinstance(obj, dict):
        # we need to check for our custom type if we receive dict here
        return decode_custom_type(obj)

    return obj

def dumps(obj):
    """Dump object to msgpack."""
    try:
        return msgpack.packb(obj, default=encode, use_bin_type=True)
    except Exception:  # pylint: disable=broad-except
        # https://github.com/msgpack/msgpack-python/blob/master/msgpack/exceptions.py#L4-L6
        logger.exception("msgpack serialization error")

def loads(obj):
    """Load msgpack data to python object."""
    return msgpack.unpackb(obj, object_hook=decode, raw=False)

def register_msgpack():
    """Register a encoder/decoder for msgpack serialization on kombu."""
    register(MSGPACK_ENCODER_NAME, dumps, loads,
             content_type='application/custom+msgpack',
             content_encoding='utf-8')

# settings.py

from backend.tasks.serialization import MSGPACK_ENCODER_NAME

CELERY_RESULT_SERIALIZER = MSGPACK_ENCODER_NAME
CELERY_ACCEPT_CONTENT = [MSGPACK_ENCODER_NAME]

It works as expected but sometimes I'm getting an error: kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted content of type json (application/json)

Full traceback:


Traceback (most recent call last):
  File "/venv/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 317, in start
    blueprint.start(self)
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 593, in start
    c.loop(*c.loop_args())
  File "/venv/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
    return self._on_event(fd, _pycurl.CSELECT_IN)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
    self._process_pending_requests()
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 130, in _process_pending_requests
    self._process(curl)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 178, in _process
    buffer=buffer, effective_url=effective_url, error=error,
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 177, in __call__
    svpending(*ca, **ck)
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 98, in _transback
    callback.throw()
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 96, in _transback
    ret = filter_(*args + (ret,), **kwargs)
  File "/venv/lib/python3.6/site-packages/kombu/transport/SQS.py", line 371, in _on_messages_ready
    callbacks[qname](msg_parsed)
  File "/venv/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 633, in _callback
    return callback(message)
  File "/venv/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 567, in on_task_received
    callbacks,
  File "/venv/lib/python3.6/site-packages/celery/worker/strategy.py", line 136, in task_message_handler
    if body is None and 'args' not in message.payload:
  File "/venv/lib/python3.6/site-packages/kombu/message.py", line 208, in payload
    return self._decoded_cache if self._decoded_cache else self.decode()
  File "/venv/lib/python3.6/site-packages/kombu/message.py", line 193, in decode
    self._decoded_cache = self._decode()
  File "/venv/lib/python3.6/site-packages/kombu/message.py", line 198, in _decode
    self.content_encoding, accept=self.accept)
  File "/venv/lib/python3.6/site-packages/kombu/serialization.py", line 253, in loads
    raise self._for_untrusted_content(content_type, 'untrusted')
kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted content of type json (application/json)

I have no idea why it happens. Somehow kombu puts a JSON to SQS In rare cases.

Env: celery 4.3.0 kombu 4.6.3

UPD: Added more error logging and got this traceback:


Traceback (most recent call last):
  File "/venv/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 317, in start
    blueprint.start(self)
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 593, in start
    c.loop(*c.loop_args())
  File "/venv/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
    return self._on_event(fd, _pycurl.CSELECT_IN)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
    self._process_pending_requests()
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 130, in _process_pending_requests
    self._process(curl)
  File "/venv/lib/python3.6/site-packages/kombu/asynchronous/http/curl.py", line 178, in _process
    buffer=buffer, effective_url=effective_url, error=error,
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 177, in __call__
    svpending(*ca, **ck)
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/venv/lib/python3.6/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 98, in _transback
    callback.throw()
  File "/venv/lib/python3.6/site-packages/vine/funtools.py", line 96, in _transback
    ret = filter_(*args + (ret,), **kwargs)
  File "/venv/lib/python3.6/site-packages/kombu/transport/SQS.py", line 371, in _on_messages_ready
    callbacks[qname](msg_parsed)
  File "/venv/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 633, in _callback
    return callback(message)
  File "/venv/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 567, in on_task_received
    callbacks,
  File "/venv/lib/python3.6/site-packages/celery/worker/strategy.py", line 136, in task_message_handler
    if body is None and 'args' not in message.payload:
  File "/venv/lib/python3.6/site-packages/kombu/message.py", line 208, in payload
    return self._decoded_cache if self._decoded_cache else self.decode()
  File "/venv/lib/python3.6/site-packages/kombu/message.py", line 193, in decode
    self._decoded_cache = self._decode()
  File "/venv/lib/python3.6/site-packages/kombu/message.py", line 198, in _decode
    self.content_encoding, accept=self.accept)
  File "/venv/lib/python3.6/site-packages/kombu/serialization.py", line 253, in loads
    raise self._for_untrusted_content(content_type, 'untrusted')
kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted content of type json (application/json)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 151, in send_all
    fun(parent, *args)
  File "/venv/lib/python3.6/site-packages/celery/bootsteps.py", line 373, in stop
    return self.obj.stop()
  File "/venv/lib/python3.6/site-packages/celery/concurrency/base.py", line 122, in stop
    self.on_stop()
  File "/venv/lib/python3.6/site-packages/celery/concurrency/prefork.py", line 140, in on_stop
    self._pool.join()
  File "/venv/lib/python3.6/site-packages/billiard/pool.py", line 1578, in join
    stop_if_not_current(self._result_handler)
  File "/venv/lib/python3.6/site-packages/billiard/pool.py", line 143, in stop_if_not_current
    thread.stop(timeout)
  File "/venv/lib/python3.6/site-packages/billiard/pool.py", line 500, in stop
    self.on_stop_not_started()
  File "/venv/lib/python3.6/site-packages/celery/concurrency/asynpool.py", line 332, in on_stop_not_started
    check_timeouts()
  File "/venv/lib/python3.6/site-packages/billiard/pool.py", line 744, in handle_event
    next(self._it)
  File "/venv/lib/python3.6/site-packages/billiard/pool.py", line 697, in handle_timeouts
    cache = copy.deepcopy(self.cache)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.6/copy.py", line 280, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.6/copy.py", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.6/copy.py", line 240, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.6/copy.py", line 169, in deepcopy
    rv = reductor(4)
TypeError: can't pickle _thread.lock objects
thedrow commented 5 years ago

Have you set the result_serializer setting to use your serializer? I'm guessing you didn't :).

ebeseda commented 5 years ago

@thedrow I did. It's in the code snippet above (settings.py)

thedrow commented 5 years ago

Can you tell what's the content of the message?

ebeseda commented 5 years ago
b'{"body":"base64_sensitive_data_here=", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "backend.tasks.restaurant.push_restaurant", "id": "7774f2c5-b527-4ebe-9985-22de48ee8c4f", "shadow": null, "eta": null, "expires": null, "group": null, "retries": 0, "timelimit": [null, null], "root_id": "7774f2c5-b527-4ebe-9985-22de48ee8c4f", "parent_id": null, "argsrepr": "(\'tb\', \'33615\')", "kwargsrepr": "{}", "origin": "gen11@de-ae-7dc8f985b-8bbmg", "__celery_context_data__": {"email": "some_email@gmail.com", "meta_request_id": "16397099-ca80-433d-8db9-afdc37ad291d", "meta_trace_id": "16397099-ca80-433d-8db9-afdc37ad291d", "platform": "tb"}}, "properties": {"correlation_id": "7774f2c5-b527-4ebe-9985-22de48ee8c4f", "reply_to": "5ef392bb-bb14-3247-9471-95923cb94226", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "ab54832d-3696-499a-b012-b716e5f978ad"}}'
# body decode
b'[["tb", "33615"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'

Somehow it puts JSON in the SQS.

thedrow commented 5 years ago

Yes. That's why https://github.com/celery/kombu/commit/7ed7df0ca1acd5ec1cfe0d988a0a8a86291af50e#diff-3cc1ff6df0f488077057a603b8f5e5d4 was committed. Not all SQS messages are base64 encoded. I don't know why though.

thedrow commented 5 years ago

Oh, AWS serializes some messages it generates using JSON. See https://github.com/celery/kombu/pull/937#issue-225587076. Could that be the issue?

ebeseda commented 5 years ago

@thedrow In my case the whole SQS message is base64 encoded. I decoded it and then decoded a message body before posting it here.

thedrow commented 5 years ago

Do the message headers indicate that it is JSON somehow?

ebeseda commented 5 years ago

yes. "content-type": "application/json"

ebeseda commented 5 years ago

FYI

CELERY_ACCEPT_CONTENT = [MSGPACK_ENCODER_NAME, "json"]

helps me to not get a ContentDisallowed error but still, the issue persists.