3Hren / msgpack-rust

MessagePack implementation for Rust / msgpack.org[Rust]
MIT License
1.14k stars 129 forks source link

Support for streaming #306

Closed hn-sl closed 2 years ago

hn-sl commented 2 years ago

Hello, is there any plan for supporting streaming? For network communication using msgpack protocol without any additional layer, the streaming interface can be useful. Without this, the user need to feed the complete message to library but it is hard for user to know the end of message. Also, the benefit of using streaming is that data can be parsed progressively upon receiving chunk of data without having to wait until full message is arrived.

Python msgpack-python library has nice streaming interface as below.

import msgpack

obj = {
    "helo": "world"
}
data = msgpack.dumps(obj)
s = len(data)

upk = msgpack.Unpacker()
upk.feed(data[:int(s*0.9)]) # feed first 90% of data

try:
    upk.unpack() # parse 90% of the data and maintain the result internally and throw OutOfData exception
except msgpack.OutOfData:
    pass

try:
    upk.unpack() # throw OutOfData exception again but this time it is done quickly since it knows there is no new data
except msgpack.OutOfData:
    pass

upk.feed(data[int(s*0.9):]) # feed remaining 10% of data
result = upk.unpack() # should be done very fast since 90% of data was already parsed.
assert result == obj
kornelski commented 2 years ago

Streaming of output is not supported, i.e. you can't see partial data of an incomplete message.

Streaming of input is supported via io::Read trait. It will read and decode data as it becomes available.

hn-sl commented 2 years ago

Streaming of input is supported via io::Read trait. It will read and decode data as it becomes available.

But this requires user already know complete sequence of data, and it doesn't partially parse incomplete message, does it? Suppose, for example, server steadily sends bunch of objects with msgpack protocol to clients. Client want to deserialize data (partially if message is incomplete) as soon as receiving chunk of data without knowing end of each object in streams. Is it possible with the library?

kornelski commented 2 years ago

Your question is ambiguous. You ask about "parsing", but I suspect you mean accessing the (partially) parsed data.

This library does parse incrementally, and does start parsing messages before they are fully received. If the server sends 3 bytes, this library will read the first byte immediately, then decide whether it needs to read more bytes, and ask for more if needed, and parse those bytes as soon as they're received without waiting for more bytes that aren't relevant. In this aspect it's the best scenario you can imagine.

However, even though parsing happens incrementally on partially-received data, the Rust API based on function calls and structs has no way of returning a partially-parsed struct. In this aspect it's the worst case there can be. Those incrementally not-yet-complete structs exist only privately in the library, and there's no way to see them until they are complete.

hn-sl commented 2 years ago

Sorry for the ambiguity. My question was about the ability of the library for parsing incrementally, not about exposing the intermediate result to user. So my point is that user should be able to get object from stream as soon as it is ready. But I forgot menitoning that what I'm interested in is the availibilty of incremental parsing on each method call by user with buffer that doesn't involve blocking io, (e.g. bytes::BytesMut). I know that from_read does the incremental parsing from io stream but the problem is that this function is sync which means that it block on reading from io object when there is no available data and therefor it is not very useful in async application.

I can read bytes from socket asynchronously whenever there is new data and feed the bytes to decoder (synchronously) which is common pattern in async application but it seems like this library doesn't work well for this kind of usage. The only way to make it work is that user wait (asynchronosuly) until complete message is ready and then feeds the complemete message to library but it is difficult for user to know end of object in stream without parsing it.

But I guess that it is not easy to make serde compatible parsing api that support incremental parsing on each call. I thought just supporting async api in this library could be easy solution but just realized that this can't be done without update of serde and this issue is more about serde itself. Maybe they can add async trait in serde in the future. I found there was a discussion about this (https://github.com/serde-rs/json/issues/575).

kornelski commented 2 years ago

It is possible to use channels to feed data from an async source into a reader that implements io::Read, so you can parse data incrementally bit by bit, without excessive buffering, as soon as it arrives from the network from an async reader.

You will need to dedicate a thread for the parser.

hn-sl commented 2 years ago

Fair enough, I'll close this.