jcrist / msgspec

A fast serialization and validation library, with builtin support for JSON, MessagePack, YAML, and TOML
https://jcristharif.com/msgspec/
BSD 3-Clause "New" or "Revised" License
2.37k stars 72 forks source link

[Question] how to recursively deserialize/decode #375

Closed spacemanspiff2007 closed 1 year ago

spacemanspiff2007 commented 1 year ago

I am currently thinking about using msgspec to use as a web socket deserializer for HABApp. However the openHAB project uses an unconventional json-in-json approach (see payload field). I tried modeling it accordingly but obviously I'm getting an error:

import msgspec

class ItemStateEvent(msgspec.Struct):
    type: str
    value: str

class BaseMsg(msgspec.Struct):
    type: str
    topic: str
    payload: ItemStateEvent

input = "{\"type\":\"ItemStateEvent\"," \
        "\"topic\":\"openhab/items/DTR/state\"," \
        "\"payload\":\"{\\\"type\\\":\\\"Quantity\\\",\\\"value\\\":\\\"5MB/s\\\"}\"}"

decoder = msgspec.json.Decoder(BaseMsg)
ret = decoder.decode(input)
msgspec.ValidationError: Expected `object`, got `str` - at `$.payload

Is there any way how I can indicate that the payload field shall be deserialized after/during the BaseMsg deserialization?

AlexanderMartynoff commented 1 year ago

In you input you have payload key with string value, because it in double quotes. Just remove double quotes for payload key value

spacemanspiff2007 commented 1 year ago

But input is how I would receive the data from the openHAB application which means it's not possible to edit the data by hand.

So I'm wondering if there is any way to indicate that the value of payload is a json string which should be deserialized, too.

AlexanderMartynoff commented 1 year ago

In that case I would try customization decoding with this - https://jcristharif.com/msgspec/extending.html?

jcrist commented 1 year ago

Hi, apologies for the delayed reply here. This is definitely doable using the existing extension support. One way would be to define a new generic type for handling JSON-in-JSON, then use it to wrap the payload values. Something like:

import msgspec
from typing import Generic, TypeVar

T = TypeVar("T")

class JSONStr(Generic[T]):
    """A wrapper type for handling JSON-in-JSON values"""
    value: T

    def __init__(self, value: T):
        self.value = value

    def __repr__(self) -> str:
        return f"JSONStr({self.value})"

class ItemStateEvent(msgspec.Struct):
    type: str
    value: str

class BaseMsg(msgspec.Struct):
    type: str
    topic: str
    payload: JSONStr[ItemStateEvent]

def enc_hook(x):
    if isinstance(x, JSONStr):
        return msgspec.json.encode(x.value).decode("utf-8")
    raise TypeError(f"{type(x).__name__} is not supported")

def dec_hook(type, value):
    if getattr(type, "__origin__", None) is JSONStr:
        inner_type = type.__args__[0]
        return JSONStr(msgspec.json.decode(value, type=inner_type))
    raise TypeError(f"{type} is not supported")

encoder = msgspec.json.Encoder(enc_hook=enc_hook)
decoder = msgspec.json.Decoder(BaseMsg, dec_hook=dec_hook)

msg = (
    b'{"type":"ItemStateEvent","topic":"openhab/items/DTR/state",'
    b'"payload":"{\\"type\\":\\"Quantity\\",\\"value\\":\\"5MB/s\\"}"}'
)

res = decoder.decode(msg)
print(res)
#> BaseMsg(
#>   type='ItemStateEvent',
#>   topic='openhab/items/DTR/state',
#>   payload=JSONStr(ItemStateEvent(type='Quantity', value='5MB/s'))
#> )

msg2 = encoder.encode(res)
assert msg == msg2
spacemanspiff2007 commented 1 year ago

Hi, apologies for the delayed reply here.

No worries - you made it worth the wait with your detailed answer with a working example. Thank you very much for that!

I tried playing around with the dec_hook, too. From the docs I would have expected something like this to work

import msgspec

class ItemStateEvent(msgspec.Struct):
    type: str
    value: str

class BaseMsg(msgspec.Struct):
    type: str
    topic: str
    payload: ItemStateEvent

def dec_hook(type, value):
    if type is ItemStateEvent:
        return ItemStateEvent(msgspec.json.decode(value, type=ItemStateEvent))
    raise TypeError(f"{type} is not supported")

decoder = msgspec.json.Decoder(BaseMsg, dec_hook=dec_hook)

msg = (
    '{"type":"ItemStateEvent","topic":"openhab/items/DTR/state",'
    '"payload":"{\\"type\\":\\"Quantity\\",\\"value\\":\\"5MB/s\\"}"}'
)

res = decoder.decode(msg)
print(res)

however this again raises the exception

msgspec.ValidationError: Expected `object`, got `str` - at `$.payload`

The solution you proposed unfortunately does not work for me: Since there are lots of different kinds of messages using the .value doesn't provide much benefit since I would have to narrow the type based on the type field of the base msg. That would put mean I need to implement the corresponding logic everywhere I intend to consume the events. Maybe I should have made that more clear - sorry. There are many events and they all are wrapped in the type, topic payload json and I would love to put as much logic as possible into the message definition.

Do you have any more ideas? It would have been really nice if there would have been a way to indicate to deserialize the payload field on the model or on the decoder because I could have used tagged unions since the type information is in msg.type, e.g.

class ItemStateEventPayload(msgspec.Struct, tag=False):
    type: str
    value: str

class ItemStateEventMsg(msgspec.Struct, tag='ItemStateEvent'):
    topic: str
    payload: ItemStateEventPayload

class AnotherMsg(msgspec.Struct, tag='AnotherMsg'):
    ...

class AThirdMsg(msgspec.Struct, tag='AThirdMsg'):
    ...

decoder = msgspec.json.Decoder(ItemStateEventMsg | AnotherMsg | AThirdMsg)

msg = (
    '{"type":"ItemStateEvent","topic":"openhab/items/DTR/state",'
    '"payload":"{\\"type\\":\\"Quantity\\",\\"value\\":\\"5MB/s\\"}"}'
)

res = decoder.decode(msg)
print(res)

That way I could have have the whole deserialisation logic offloaded onto msgspec which I hoped would be much faster and less error prone than my python code.

jcrist commented 1 year ago

Sure. There are two ways I can think of to handle this kind of structure, depending on how you want to work with the output data.

I like option 2 the best as it's simpler, but they're both functional.

1. A different type per event

In this method you have a different top-level type per each event. This means twice as many types to define (one per "payload" type, with an additional wrapper "event" type for each). To get type annotations to work properly in this version you have to do a bit of magic, especially if you want to hide the existence of the JSONStr wrapper class. Whether this magic is worth it is up to you.

Note that this version relies on Generic Struct types, which exist on the main branch but haven't been released yet.

Example Code ```python # In this version, every Event type has a different class # # I kinda went wild with type annotations to make `mypy`/`pyright` happy with # using this code. You definitely don't _need_ all of this to get something # working. import msgspec from typing import Generic, TypeVar, Union, Type, Any T = TypeVar("T") C = TypeVar("C", bound=msgspec.Struct) class JSONStr(Generic[T]): """A wrapper type for handling JSON-in-JSON values""" value: T def __init__(self, value: T): self.value = value def __eq__(self, other: Any) -> bool: return type(other) is JSONStr and self.value == other.value def __repr__(self) -> str: return f"JSONStr({self.value})" class Event(msgspec.Struct, Generic[T], tag=True): """A base class for all events.""" topic: str # We use a hidden field to store the nested JSONStr[T] type, then expose # the wrapped Payload type to downstream code through the payload property # below. _payload: JSONStr[T] = msgspec.field(name="payload") @property def payload(self) -> T: """A property to hide the existence of the JSONStr `_payload` field""" return self._payload.value def __repr__(self): """Override repr to hide the existence of JSONStr""" return f"{type(self).__name__}" @classmethod def from_payload(cls: Type[C], topic: str, payload: T) -> C: """An classmethod constructor, hiding the existence of the JSONStr wrapper""" return cls(topic, JSONStr(payload)) # Defining a new event requires: # - Defining a payload for its contents # - Defining an Event subclass, parametrized by the payload class. class ItemStateEventPayload(msgspec.Struct): type: str value: str class ItemStateEvent(Event[ItemStateEventPayload]): pass class AnotherEventPayload(msgspec.Struct): fizz: int buzz: str class AnotherEvent(Event[AnotherEventPayload]): pass # `enc_hook` and `dec_hook` implementations for handling encoding/decoding of # the `JSONStr` types. def enc_hook(x): if isinstance(x, JSONStr): return msgspec.json.encode(x.value).decode("utf-8") raise TypeError(f"{type(x).__name__} is not supported") def dec_hook(type, value): if getattr(type, "__origin__", None) is JSONStr: inner_type = type.__args__[0] return JSONStr(msgspec.json.decode(value, type=inner_type)) raise TypeError(f"{type} is not supported") _encoder = msgspec.json.Encoder(enc_hook=enc_hook) # We use `__subclasses__` to find all subclasses of `Event` automatically. This # means all event subclasses have to be defined _before_ creating the # `Decoder`. If this is too magical, you could also explicitly write out a # union of all possible Event types. _decoder = msgspec.json.Decoder(Union[tuple(Event.__subclasses__())], dec_hook=dec_hook) # Functions for JSON encoding & decoding Event types. def encode(x: Event) -> bytes: return _encoder.encode(x) def decode(msg: bytes) -> Event: return _decoder.decode(msg) # The main example - decoding and encoding a message msg = ( b'{"type":"ItemStateEvent","topic":"openhab/items/DTR/state",' b'"payload":"{\\"type\\":\\"Quantity\\",\\"value\\":\\"5MB/s\\"}"}' ) res = decode(msg) print(res) # ItemStateEvent< # topic='openhab/items/DTR/state', # payload=ItemStateEventPayload(type='Quantity', value='5MB/s') # > expected = ItemStateEvent.from_payload( "openhab/items/DTR/state", ItemStateEventPayload("Quantity", "5MB/s") ) assert res == expected msg2 = encode(res) assert msg == msg2 ```

2. A single top-level type, and a per-payload type

This version uses a single top-level Message type, parametrized by one of a number of Payload types. I like this version a lot better than the first as its simpler and requires less type magic to make mypy/pyright happy.

Example Code ```python # In this version there's a different class per payload, but only a single # top-level (generic) class for wrapping the payload. import msgspec from typing import Generic, TypeVar, TypedDict, Literal class Payload(msgspec.Struct): pass # Every payload type needs a single definition. As currently written the clas # name needs to match the corresponding `type` value, but that can be changed # if needed. class ItemStateEvent(Payload): type: str value: str class AnotherEvent(Payload): fizz: int buzz: str T = TypeVar("T", bound=Payload) class Message(Generic[T]): """A generic Message wrapper used for all payload types""" topic: str payload: T def __init__(self, topic: str, payload: T) -> None: self.topic = topic self.payload = payload def __eq__(self, other): return ( type(other) is Message and self.topic == other.topic and self.payload == other.payload ) def __repr__(self): return f"Message(topic={self.topic!r}, payload={self.payload!r})" # A lookup table of all possible Payload types. This uses `__subclasses__` to # automatically find all `Payload` subclasses. If this is too magical, you # could explicitly write out this mapping instead. _payload_class_lookup = {cls.__name__: cls for cls in Payload.__subclasses__()} class _MessageSchema(TypedDict): """A schema used for validating Message objects in `dec_hook` below""" type: Literal[tuple(_payload_class_lookup)] # type: ignore topic: str payload: str # `enc_hook` and `dec_hook` implementations for handling encoding/decoding of # the `Message` type. def enc_hook(x): if isinstance(x, Message): return { "type": type(x.payload).__name__, "topic": x.topic, "payload": msgspec.json.encode(x.payload).decode("utf-8"), } raise TypeError(f"{type(x).__name__} is not supported") def dec_hook(type, data): if type is Message: # Use `from_builtins` to validate `data` matches the expected # MessageSchema. This isn't strictly necessary, but does make # it easier to raise a nicer error on an invalid `Message` msg = msgspec.from_builtins(data, _MessageSchema) payload_cls = _payload_class_lookup[msg["type"]] payload = msgspec.json.decode(msg["payload"], type=payload_cls) return Message(msg["topic"], payload) raise TypeError(f"{type} is not supported") _encoder = msgspec.json.Encoder(enc_hook=enc_hook) _decoder = msgspec.json.Decoder(Message, dec_hook=dec_hook) # Functions for JSON encoding & decoding a Message def encode(x: Message) -> bytes: return _encoder.encode(x) def decode(msg: bytes) -> Message: return _decoder.decode(msg) # The main example - decoding and encoding a Message msg = ( b'{"type":"ItemStateEvent","topic":"openhab/items/DTR/state",' b'"payload":"{\\"type\\":\\"Quantity\\",\\"value\\":\\"5MB/s\\"}"}' ) res = decode(msg) print(res) # Message( # topic='openhab/items/DTR/state', # payload=ItemStateEventPayload(type='Quantity', value='5MB/s') # ) expected = Message("openhab/items/DTR/state", ItemStateEvent("Quantity", "5MB/s")) assert res == expected msg2 = encode(res) assert msg == msg2 ```
jcrist commented 1 year ago

Closing as stale/resolved. Please comment/open a new issue if you have more questions.