Arwalk / zig-protobuf

a protobuf 3 implementation for zig.
MIT License
188 stars 20 forks source link

Incremental decoding / Streaming API #30

Open Rubeer opened 6 months ago

Rubeer commented 6 months ago

At a low level, the protobuf encoding allows you to decode one field at a time. But usually it is more convenient to have an API where you decode one message at a time.

To do this, most protobuf libraries will require the entire encoded message to be in contiguous memory before decoding it. This is fine in most cases, but sometimes you want to be able to decode messages incrementally. This saves on copying out to intermediate buffers.

Example use case in an embedded system, or a kernel driver: Data is received into a ringbuffer or FIFO from some hardware peripheral. This buffer could be smaller than the size of an encoded protobuf message, and depending on the head/tail position, some messages may also wrap around from the end to the start of the buffer. Normally it would need to be defragmented/copied to an intermediate buffer before decoding, which takes more memory, or you need to resort to manually parsing one field at a time. In any case some manual logic / state machine is required.

Proposal: a "throw bytes at this function" API, where the library calls you back every time it has fully decoded a message. The decoding state machine is generated by the library. Example usage code:

// Only need to call this once. Possibly comptime.
var decode_state = protobuf.pb_decoder_init(.{ 
    .message_type = MyMessageType,
    .allocator = allocator,
    .delimiter_parser = protobuf.VarIntDelimiterParser, // Library provided implementation, or could be a custom implementation
    .decode_callback = message_decoded_callback,
});
// Somewhere else in the codebase: we receive some bytes.
// This is a slice with any number of bytes, i.e. half a message, or three entire messages.
// message_decoded_callback is called whenever a MyMessageType is fully decoded,
// across any number of calls to pb_decode_bytes.
// This function should only use a buffer large enough to decode one field.
// It only calls the allocator when decoding dynamic data like strings or repeated fields.
try protobuf.pb_decode_bytes(&decode_state, bytes_from_somewhere());
fn message_decoded_callback(decoded: *const MyMessageType) void {
    defer decoded.deinit();
    // do stuff with decoded message 
}

I am not familiar enough with zig-protobuf internals to know how hard this to implement, but I imagine it would be a good chunk of code. From an API design perspective the hard part is going to be the delimiter parser, which would be some interface to parse the delimiters/headers. Ideally it would be flexible enough that you can implement things like checksums/CRC with it as well.

A streaming encoder would be nice to have as well, but it is not as important in my opinion.

I ended up typing out quite the story here, but it is just some draft/idea, I understand if it is outside of the scope of this project.

Arwalk commented 6 months ago

Hey there @Rubeer , thanks for the issue.

Right now, the way zig-protobuf works is centered around the "message as a zig struct" idea. Once you have generated the structures with the protoc plugin, the api is centered around the methods available in the MessageMixins type. Namely, once you have defined a MyMessage message, you can then just

var m : MyMessage = MyMessage.decode(buffer, allocator);
var e : []u8 = m.encode(allocator);

internally, we pass around slices of the buffer while decoding, iterating over the (sub-)elements of the buffer in the WireDecoderIterator structure.

I guess we could rework the WireDecoderIterator to be able to work with some simple "bytestream" that gives bytes one by one, and add a new decoding API that is able ot use such a streamer. This would be some work but it doesn't seem impossible to me.

The main downside to this, for me, is that we would have to buffer internally everything that has a variable length, meaning more use of the allocator for arrays and submessages.

The new api would be something along the lines of

var m = MyMessage.decode(bytestream, allocator);

where bytestream is anything that can be called with var byte : ?u8 = bytestream.next() (maybe a bit more if we want to handle error cases). I'm even tempted to just rely on zig's comptime duck-typing for this, out of pure laziness.

Would that kind of solution be along the lines of your expectations?

Rubeer commented 6 months ago

Just to preface - a bytestreamer api might reduce the performance for regular use cases. It could make many optimization techniques much harder. I would try to keep the decoders separate so people that don't use this feature are not affected.

Looking at the wire format, I agree that memory allocation would be problematic for dynamic data. I assume that if you have the full message in memory you can do a pre-pass to figure out how to allocate things efficiently?

Now back to your proposal: this is something that nanopb also offers, a byte reader/writer interface. The problem with it is that it implies some blocking behaviour inside of bytestream.next(). i.e. the control flow is handed to the library, and not given back until the message is fully encoded (correct me if I am wrong). This implies the need for an entire OS thread for each active connection.

With such an API design I would go back to the "old" way of doing things since it would use less memory in total. I want to be able to multiplex multiple connections on a single thread.

I wrote my example in such a way that no blocking logic would be required. I agree this does put more burden on the library. It will need some state machine to remember how to continue decoding the message from where it left off.

Rubeer commented 6 months ago

Maybe a feature like this should wait until zig has async suspend/resume implemented again.

Arwalk commented 6 months ago

It seems like it would be better to look at this when async is back on the menu. I'll leave it at that for now.