ezmsg-org / ezmsg

Pure-Python DAG-based high-performance SHM-backed pub-sub and multi-processing pattern
https://ezmsg.readthedocs.io/en/latest/
MIT License
9 stars 5 forks source link

zmq - Fixes #49 by making wait for subscribers optional. #51

Closed cboulay closed 7 months ago

cboulay commented 8 months ago

See #49 for motivation. Also added support for json-encodable message-types.

griffinmilsap commented 7 months ago

@pperanich - I'd appreciate your review here :D

pperanich commented 7 months ago

I agree that we should make waiting for subscribers optional.

On the topic of json-encodeable message types, I'm not convinced we should have the default behavior json encode messages that aren't of ZMQMessage type. I would be in support of adding an optional Setting for specifying the serialization format of the incoming messages. E.g.

class ZMQSenderSettings(ez.Settings):
    ...
    ser: Optional[Callable]

...

class ZMQSender(ez.Unit):
    ...
    async def zmq_subscriber(self, message: ZMQMessage):
        ...
        if self.SETTINGS.ser is not None:
            message = self.SETTINGS.ser(message)

So for your use case:

ZMQSenderSettings(ser=lambda m: json.dumps(m).encode('utf-8'))

Possibly worth migrating this topic to a separate issue. I am good with the core purpose of this PR.

cboulay commented 7 months ago

Yeah the JSON thing was just a way for me to test it and I thought it might be useful generally. But I think this kind of message-type-conversion probably belongs in its own Unit. I reverted that change.