Closed madsbk closed 3 years ago
I just realized that there is in fact a fallback function for encoding: https://github.com/jcrist/msgspec/commit/35db69d7257e983e438d76625795b1fb27da2328
However, I don't see a corresponding fallback function for decoding? As it is now, I would have to search through the decoded data in order to reconstruct the original unsupported objects?
How would that work on decoding? A MsgPack message is composed of nested tagged types (nil
, bool
, int
, float
, map
, array
, ...). Some of these can correspond with multiple Python types (for example, tuples and lists both serialize as the array
type). The default
hook on encoding is called on types that we don't support by default but can map to the set of builtin msgpack types. For example, you might serialize a numpy array as an array (this would be inefficient, it's just an example).
In [1]: import msgspec
In [2]: import numpy as np
In [3]: def default(x):
...: if isinstance(x, np.ndarray):
...: return x.tolist() # convert to msgpack supported types
...: raise TypeError
...:
In [4]: encoder = msgspec.Encoder(default=default)
In [5]: encoder.encode([1, 2, 3]) # Serialize a list of ints
Out[5]: b'\x93\x01\x02\x03'
In [6]: encoder.encode(np.array([1, 2, 3])) # Serialize a numpy array
Out[6]: b'\x93\x01\x02\x03'
In [7]: _ == __ # Message bytes are the same for both types
Out[7]: True
Since the default
hook converts to standard msgpack types, the message bytes for a list of ints or a numpy array of ints are the same, there's no way to distinguish them in the decoder without more context (this is shown in In [7]
above).
There are 2 ways I can think of around this that aren't currently implemented in msgspec, but could be:
Extension types are part of the MsgPack protocol and allow defining new custom serialized message types (by adding more "tags" to the format). In this case the serialized message would include a designator marking the object as a certain extension with a custom serializer/deserializer, allowing for schema-less deserialization. I'd want to think about how we'd expose this at an API level, but a rudimentary interface might be:
exts = msgspec.ExtensionRegistry()
@exts.register_encoder(object, 42)
def serialize(x: object) -> bytes:
"""Default fallback to pickle bytes with tag code 42"""
return pickle.dumps(x)
@exts.register_decoder(42)
def deserialize(x: bytes) -> object:
"""Deserialize pickle bytes to an object"""
return pickle.loads(x)
encoder = msgspec.Encoder(extensions=exts)
decoder = msgspec.Decoder(extensions=exts)
This would work with both typed and untyped messages, but has the downside that it's effectively a custom protocol - other msgpack libraries would have to implement your extension to work properly, since the message is no longer composed of only standard types.
An alternative mechanism would be to extend typed serialization to support custom deserialization hooks. We already support deserializing the same message into different types by specifying a type to deserialize as:
In [1]: import msgspec
In [2]: msg = msgspec.encode([1, 2, 3]) # serialize a list of ints
In [3]: msgspec.decode(msg) # deserializes using default types by default
Out[3]: [1, 2, 3]
In [4]: from typing import Set
In [5]: msgspec.decode(msg, type=Set[int]) # deserialize into a set instead by specifying the type
Out[5]: {1, 2, 3}
We could extend this to support custom types, so only certain parts of the message schema could serialize nonstandard types. This might look like:
import msgspec
import cloudpickle
from typing import Annotated
pickled = msgspec.Options(encode=cloudpickle.dumps, decode=cloudpickle.loads)
class Task:
function: Annotated[Callable, pickled] # always pickled (even if it's a msgpack compatible type), to msgpack this looks like a binary blob
args: Annotated[Tuple[Any], pickled] # same here
name: str # this is serialized via msgpack as normal
resources: Optional[List[str]] = None
def add(x, y):
return x + y
task = Task(
function=add,
args=(1, 2),
name="add",
)
msg = msgspec.encode(task)
assert isinstance(msg, bytes)
out = msgspec.Decoder(Task).decode(msg)
assert task == out
This is nice, because the serialized message is only composed of standard msgpack types (so any msgpack library can deserialize the message), when we decode them we tell the decoder how to view those objects to reconstruct the python objects you want (in this case calling cloudpickle.loads
).
My gut reaction is that the 2nd is better for what you want, but it would be helpful to get more context as to how you intend to use this.
I think the first approach, Extension Types
, is what we want.
In Dask/Distributed we need an asymmetric encode/decode scheme where the encoder might encode more objects than the decoder decode (if that make sense). We need to support a flow like this:
1) Encode an object and call a default function when encountering unsupported objects
2) The default function serialize using Dask's own serialize infrastructure and handles the communication of the data out-of-band, it only returns a token that msgspec
encodes.
3) The msgspec
encoder returns some bytes that Dask can send over the wire along side the out-of-band data.
4) The receiving end decodes the bytes and evoke a default function on each object that were encoded by a default functions. In our case, the object is a token that enable us to fetch the data from the out-of-band channel. Notice, in some cases we will deserialize the out-of-band data but in other cases it will stay as a serialized blob of data.
Either mechanism could work for this. For messages that include out-of-band data, are the objects that can be serialized out-of-band always in some fixed part of a larger schema? For example (pseudocode)
class Task(msgspec.Struct):
"""Here we define a message schema for a Task.
Both `function` and `args` include objects that will _always_ be serialized out-of-band,
while `resources` will only ever use the builtin types.
"""
function: Annotated[Callable, OutOfBand]
args: Tuple[Annotated[Any, OutOfBand], ...]
resources: Optional[List[str]]
If you have structured schemas, where you know where out-of-band objects will be then the 2nd approach seems better to me, as you avoid the use of custom extension types (which are then msgspack-implementation-specific). However, if your messages are all unstructured/schemaless or you don't know where an out-of-band-serializable object may appear in the message then extension types would be better.
Another thing to think about is consistency/predictability of when/where your custom serializer is used. For example, given the following type:
class Task:
function: Callable
args: Tuple[Any, ...]
resources: List[str]
my_task = Task(
function=sum,
args=(1, np.array([1, 2, 3]), 3.0),
resources=["some", "strings"],
)
With extension types, sum
an np.array([1, 2, 3])
would be serialized using your custom serializer (since msgpack doesn't support those types by default), while the other args
(1
, and 3.0
) would be serialized as standard msgpack types. This seems potentially problematic to me, as your custom serializer could be used at any point in the object, even for fields where it shouldn't be.
With custom type hooks, function
and args
would be marked to always use your custom hooks. This means that even for 1
and 3.0
(which are valid msgpack types) your custom hooks would still be called. But you also know that resources
will never use your custom hooks, and can always be deserialized using standard msgpack types.
However, if your messages are all unstructured/schemaless or you don't know where an out-of-band-serializable object may appear in the message then extension types would be better.
This is the case unfortunately :/
At some point we should properly make Dask/Distributed more structured in this regard but for now I think we should use msgspec
to simplify the code and get some speedup. Then later we can look at making it more structured.
Note that I haven't forgotten about this! I think the above is doable - it'd be about a week of work on my part I'd guess. Is this a path you think is worth going down? If this ends up getting used by distributed I think it'd be worth spending time on, but if it's just part of an experiment that might be scrapped I'm less likely to spend time now on it. Getting a rough estimate of how sure you are this is a valid solution to your problem would be useful.
I think there is a very good chance this will get in if it makes a significant performance difference.
Another thing, is it possible to have the decoder maintain object types when not specifying any decode type? E.g. lists are decoded as lists and sets are decoded as sets?
Sure. I think it would help to know what kind of performance difference you're looking for, and what operation(s) you're hoping will be faster? Lower latency for serializing a single message? Or deserializing? Does this matter for small messages or only large ones? An example of a message and code to serialize/deserialize it that I could use as a benchmark would be useful.
is it possible to have the decoder maintain object types when not specifying any decode type? E.g. lists are decoded as lists and sets are decoded as sets?
No, not without type info. As mentioned above, msgpack only contains a few typecodes. For array-like things (lists, tuples, sets, ...), msgspec can only mark these as array
types. On decode, without decode type information we can only pick the default array
type to use (msgspec uses list
here).
In [1]: import msgspec
In [2]: msg = msgspec.encode([1, 2, 3])
In [3]: msg2 = msgspec.encode((1, 2, 3))
In [4]: msg == msg2 # there's no way to distinguish between tuples & lists in the serialized message
Out[4]: True
After further thought, I'm thinking the interface provided by the other msgpack library might make sense to mirror (with some additions on our side). We'd need to:
msgspec.ExtType(code: int, data: bytes)
representing an already encoded extension object. The existing default
hook can return an object of this type, which will then be serialized appropriately. Users can build up more complicated dispatch behavior as needed using e.g. functools.singledispatch
.ext_hook
to msgspec.Decoder
and msgspec.decode
. This takes a callable of signature ext_hook(code: int, data: bytes) -> object
, which is called whenever an extension type is encountered in a message stream. As with the above, the user is free to develop their own dispatch mechanism if desired.I like this since it still supports building a registry mechanism on top of it, but doesn't require it. It also should be pretty quick to implement.
Usage would look like:
def default(obj: object) -> bytes:
data = pickle.dumps(obj)
return msgspec.ExtType(42, data)
def ext_hook(code: int, data: bytes) -> object:
if code == 42:
return pickle.loads(data)
raise ValueError(f"Unknown Ext Type code {code}")
encoder = msgspec.Encoder(default=default)
decoder = msgspec.Decoder(ext_hook=ext_hook)
Sure. I think it would help to know what kind of performance difference you're looking for, and what operation(s) you're hoping will be faster? Lower latency for serializing a single message? Or deserializing? Does this matter for small messages or only large ones?
The goal here is to minimize the overhead of serialization and everything associated with serialization. Right now the complete serialization process in Dask/Distributed: 1) (In Python) Walk through all outbound messages and make sure they are msgpack serializable, handle implicit list->tuple conversion, and mark objects we want serialized out-of-band. 2) (In Cython) Serialize the messages using msgpack 3) (In Python) Walk through all inbound messages and unpack non-msgpack serializable objects (if not on the scheduler), handle implicit list->tuple conversion
My hope is that we can eliminate step (1) and (3) and do everything in a single pass in Cython/C. I see two approaches:
1) Use a library like msgspec or quickle that handles as much as possible including all basic Python types (no implicit conversion) and support a fallback/default function. This way, we only have to do a single pass and most of that pass is implemented in C. In this approach we are not sensitive to the performance of small messages. AFAIK, a safe fast serialization library that supports basic Python types and a default encoding and decoding function doesn't exist.
2) Write the serialization process in Cython (Pure Python Mode) and encode the basic Python objects using msgspec. In this approach, we are very much sensitive to the performance of encoding single objects.
An example of a message and code to serialize/deserialize it that I could use as a benchmark would be useful.
Can you provide this please?
handle implicit list->tuple conversion
Can you talk about when/why this is necessary? Msgspec will currently deserialize arrays as lists by default (unless the decoder has type info provided). I'm unlikely to change this. However, a small PR could switch to use tuples in contexts where the array type will be hashed (e.g. keys in dicts, sets). So we'd deserialize array-likes as lists by default unless they were keys in dicts/values in sets.
This would mean that roundtripping (1, 2, 3)
would still deserialize as [1, 2, 3]
without type info on the decoder end (but {(1, 2, 3): [4, 5, 6]}
would roundtrip appropriately). There's not much we could do about that - msgpack only has one array type. We could push more types into extension types, but at that point you're really inventing your own protocol and may be better served by a different serializer.
Use a library like msgspec or quickle
Is using quickle actually an option here? It'd be better for some things, but has a distinct set of tradeoffs:
I'm happy to add an extension mechanism to quickle as well if that'd be useful, so either library could support falling back to pickle when needed (if configured). But if distributed may move to message schemas in the future (or clients in other languages) then I think msgspec would be the better choice here.
@madsbk - I have some mild availability to work on this now. Any response to the above?
However, a small PR could switch to use tuples in contexts where the array type will be hashed (e.g. keys in dicts, sets). So we'd deserialize array-likes as lists by default unless they were keys in dicts/values in sets.
This is done in #29.
@jcrist - sorry for the radio silence. I am trying to get a better understanding of how we want the overall design of messages and serialization in Distributed. In this week I will try to use msgspec as is for all our high frequency messages that are easy to standardize and see how far I can get. My gut feeling is that we will need a secure pickle library for the task graph like Quickle (with default support) but I am not sure. I will get back when a know more :)
However, a small PR could switch to use tuples in contexts where the array type will be hashed (e.g. keys in dicts, sets). So we'd deserialize array-likes as lists by default unless they were keys in dicts/values in sets.
This is done in #29.
Nice, thanks!
No worries. I went ahead and added the extension hooks anyway in #31 (had a bit of free time, and this seemed interesting).
Feature request, make it possible to specify a fallback function that is called when the encoder encounters an unsupported object. The fallback function can then handle the unsupported object using only msgspec compatible types. When decoding, a corresponding fallback function are then called to handle the decoding of the object.
Motivation
In Dask/Distributed we are discussing the replacement of
msgpack
. The library should be:msgpack
but with a performance penalty)