Closed cruhland closed 9 years ago
@BenWhitehead done making code changes. I'd like to add some comments to the new classes if you think they look good.
The biggest changes since last time are the ConcatenatedInputStream
which is a more robust equivalent of SequenceInputStream
, and the use of nio.ByteBuffer
to handle parsing the message and message size without fiddling around directly with byte arrays and indexes.
Overall there is more code, but (I hope) each class/method is fairly simple when looked at individually.
Spoke with @cruhland offline, and we're going to close this without merge.
Rational: RxJava is built on the concept of push based events and reacting to those events. The approach implemented here is more of a pull approach where it waits to read everything before moving onto the next step rather than reading everything and passing along a message if it's completed.
This is a tentative change. I have a few more ideas to simplify the remaining large chunk of code, but I'd like to know if I'm going in the right direction.
I've created a
MessageStream
class that makes it slightly easier to parse the RecordIO messages from the incoming stream ofByteBuf
s from Netty. It hides the individualByteBuf
s behind anInputStream
, so that the parsing code does not need to worry about their methods or switching from oneByteBuf
to the next.I've also made some minor code simplifications which I hope improve readability. And there is now a length limit for the "message size" field, to prevent us from using an unbounded amount of memory when parsing it.