ethereum / lahja

Lahja is a generic multi process event bus implementation written in Python 3.6+ to enable lightweight inter-process communication, based on non-blocking asyncio
MIT License
394 stars 19 forks source link

Custom event serialization. #173

Open pipermerriam opened 4 years ago

pipermerriam commented 4 years ago

What is wrong

Currently we use pickle for serialization of events across the bus.

This is not ideal since

How can it be fixed.

Let Event implementations specify their own serialization/deserialization.

This means that we'll need a simple message envelope for transmission of messages across the bus as well as a way for multiple endpoints to negotiate their event types so that connected endpoints can reliably communicate about event types.

Here is a simple starter idea for this.

Message envelope is:

DATA_LENGTH | DATA
DATA = EVENT_ID | PAYLOAD

Two endpoints which are connected to each other will need a way to communicate a mapping of EVENT_ID -> EventType. This should probably be a new internal message type.

A standing question is the identifier that and endpoint uses to reference an event class. I have two ideas.

  1. Use a string of the dot separated import path of the class.
  2. Require explicit pre-registration of event classes.

The first makes for simple UX but it might result in some ambiguity as well as maybe not supporting dynamically created classes.

The second ends up with some coordination cost but I think it is my preference. We can probably provide a simple API for doing this that reduces boilerplate and maybe even makes it automatic for common use cases.

When an endpoint sends an event it would construct the payload by looking up the EVENT_ID for the type of the event being sent, call event.serialize().

data = event.serialize()

payload = struct.pack('<I', len(data) + 2) + struct.pack('<H', get_event_id(event)) + data

When an endpoint receives a message:

data_length_bytes = socket.read_exactly(4)
data_length = stuct.unpack('<I', data_length_bytes)[0]
data = socket.read_exactly(data_length)
event_id = stuct.unpack('<H', data[:2])[0]
payload = data[2:]
event_type = get_event_type(event_id)
event = event_type.deserialize(payload)