ezmsg-org / ezmsg

Pure-Python DAG-based high-performance SHM-backed pub-sub and multi-processing pattern
https://ezmsg.readthedocs.io/en/latest/
MIT License
9 stars 5 forks source link

[Request] FormatMessage(ez.Unit) #57

Open cboulay opened 7 months ago

cboulay commented 7 months ago

Inspired by @pperanich 's comment, I took a go at creating a reusable FormatMessage Unit. The code snippet is below.

I'd appreciate any feedback and a nudge if you would like this in a PR. I'm sure you have better ideas of how to implement something like this so I don't mind at all if you make this obsolete with your own implementation.

from typing import Optional, Callable, AsyncGenerator, Any
from dataclasses import asdict
import json

import ezmsg.core as ez
from ezmsg.util.messages.axisarray import AxisArray
from ezmsg.zmq.units import ZMQMessage

def aa2dict(aa_msg: AxisArray, data_as_list=True):
    out = asdict(aa_msg)
    if data_as_list:
        out["data"] = out["data"].tolist()
    return out

class FormatMessageSettings(ez.Settings):
    fun: Optional[Callable] = None

class FormatMessage(ez.Unit):
    SETTINGS: FormatMessageSettings

    INPUT = ez.InputStream(Any)
    OUTPUT = ez.OutputStream(Any)

    def initialize(self) -> None:
        if self.SETTINGS.fun is None:
            # Default fun will convert AxisArray to dict without ndarray,
            # then json-encoded-string, then encode to bytes.
            # Finally, it packages it in a ZMQMessage
            self.SETTINGS = FormatMessageSettings(lambda m: ZMQMessage(json.dumps(aa2dict(m)).encode("utf-8")))

    @ez.subscriber(INPUT)
    @ez.publisher(OUTPUT)
    async def on_message(self, message: Any) -> AsyncGenerator:
        yield self.OUTPUT, self.SETTINGS.fun(message)
pperanich commented 7 months ago

I like what I see here. This would be a nice drop-in unit to serialize messages which could then be used by a number of potential transports, such as ZMQ. I'd propose the following changes:

from typing import Optional, Callable, AsyncGenerator, Any
from dataclasses import asdict
import json
import numpy as np

import ezmsg.core as ez

class NumpyArrayEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()  # Convert numpy array to list
        return json.JSONEncoder.default(self, obj)

class FormatMessageSettings(ez.Settings):
    fun: Callable = lambda msg: json.dumps(asdict(msg), cls=NumpyArrayEncoder).encode('utf-8')

class FormatMessage(ez.Unit):
    SETTINGS: FormatMessageSettings

    INPUT = ez.InputStream(Any)
    OUTPUT = ez.OutputStream(bytes)

    @ez.subscriber(INPUT)
    @ez.publisher(OUTPUT)
    async def on_message(self, message: Any) -> AsyncGenerator:
        yield self.OUTPUT, self.SETTINGS.fun(message)

Namely:

I'll add that it would be nice to have the inverse of this unit for deserialization. @griffinmilsap any thoughts?

cboulay commented 7 months ago

I originally tried putting a lambda in the Settings but then I get

_pickle.PicklingError: Can't pickle <function FormatMessageSettings.<lambda> at 0x16bef11c0>: attribute lookup FormatMessageSettings.<lambda> on ezmsg.scratch.formatmsg failed

It does work, however, if it's a regular function. Any concern with making it a function instead of a lambda?

isinstance(obj, np.ndarray) is a bit slow. It's usually faster (and more "pythonic") to try/except.

If the intention is to use this as a serializer, and a different unit would be a deserializer, then the name should be changed. How about serializemsg.SerializeMessage?

from typing import Callable, AsyncGenerator, Any
from dataclasses import asdict
import json

import ezmsg.core as ez

class NumpyArrayEncoder(json.JSONEncoder):
    def default(self, obj):
        try:
            return obj.tolist()  # Likely numpy array to list
        except AttributeError:
            return json.JSONEncoder.default(self, obj)

def serialize_msg(msg):
    return json.dumps(asdict(msg), cls=NumpyArrayEncoder).encode('utf-8')

class SerializeMessageSettings(ez.Settings):
    fun: Callable = serialize_msg

class SerializeMessage(ez.Unit):
    SETTINGS: SerializeMessageSettings

    INPUT = ez.InputStream(Any)
    OUTPUT = ez.OutputStream(Any)

    @ez.subscriber(INPUT)
    @ez.publisher(OUTPUT)
    async def on_message(self, message: Any) -> AsyncGenerator:
        yield self.OUTPUT, self.SETTINGS.fun(message)

If you're happy with this in principle then I'll create a PR. Where would it go? ezmsg.util?

pperanich commented 7 months ago

My bad on lambda causing a pickling error: I didn't run that code beforehand to check is was valid.

I'm fine with it the default serialization being a function.

I am generally not in favor of using try/except blocks as a method of generic control flow. Do you have reason to believe isinstance will be too slow for your particular use case?

I'm happy with the name change and agree this belongs in the utils folder as well. Feel free to create a PR.

cboulay commented 7 months ago

isinstance can be fast on some objects, but for most objects it is quite slow because it goes through many lines of pure Python code before reaching its decision. This line could conceivably be hit 100k times / sec so I think it's worth a bit of premature optimization.

In this case, as we don't really care about the class, we only care that it has the method we're calling, an alternative to the try/except that's faster than isinstance is if hasattr(obj, "tolist"). It also has the benefit that it will work with other objects that happen to have tolist. Will that do?

griffinmilsap commented 7 months ago

I have some code in ezmsg.util.messagecodec that could be useful here. It serializes a message into and out of a json dict so we don't need to replicate that. We could even just have that be the default FormatMessage behavior and turn MessageLogger into a collection that has FormatMessage with a file logger that just writes incoming strings to a file.

Thoughts?

Edit: anticipating performance concerns with MessageEncoder; optimizations made there would benefit more than one code path ;)