msgpack / msgpack-java

MessagePack serializer implementation for Java / msgpack.org[Java]
http://msgpack.org/
Apache License 2.0
1.42k stars 321 forks source link

Supporting event-driven I/O in unpackXXX #92

Open xerial opened 10 years ago

xerial commented 10 years ago

This topic is an excerpt of the discussion in https://github.com/msgpack/msgpack-java/pull/82#discussion_r12501596

In event-driven I/O, we call unpackXXX after we receive new data from a network (or another resource). That indicates unpackXXX might be called even if insufficient amount of packed data is available.

Because of this, behavior of unpackXXX (e.g., unpackInt, unpackString, etc.) would be as follows:

  1. It returns the unpacked value and proceeds the cursor on success.
  2. If it fails due to lack of the incoming data, it throws an InsufficientDataException. The cursor does not move in this case, so that the user can call unpackXXX after sufficient amount of data becomes available.
xerial commented 10 years ago

Here is a summary of comment from @frsyuki: https://github.com/msgpack/msgpack-java/pull/82#discussion_r12675357

xerial commented 10 years ago

@frsyuki 's idea is to cache the header and data length information of message packed data to allow calling unpackXXX multiple times without making complex the implementation of MessageBufferInput.

Question:

frsyuki commented 10 years ago

Here is my idea to support event-driven IO efficiently:

public short unpackShort() throws IOException {
    final byte b = getHeadByte();
    if (Code.isFixInt(b)) {
        this.head = READ_NEXT_HEAD_BYTE;
        return (short) b;
    }
    switch(b) {
    case Code.INT32:
        int value = readInt();  // may throw InsufficientDataException (retryable)
        // readInt succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        // ...
        break;
    // more cases ...
    }
}

private MessageBuffer extraBuffer = MessageBuffer.wrap(new byte[8]);
private int extraBufferFilledSize = 0;

private int readInt() throws IOException {
    if (extraBufferFilledSize > 0) {
        if (extraBufferFilledSize == 4) {
            int value = extraBuffer.getInt(0);  // extraBuffer needs at most 8 bytes for readInt()
            extraBufferFilledSize = 0;
            return value;
        }
    } else if (buffer.size() - position >= 4) {
        // this is the fast-path which should be inlined
        int value = buffer.getInt(position);
        position += 4;
        return value;
    }

    // fill extraBuffer from buffer...
    if (buffer.size() <= position) {
        // buffer is empty
        requireNextBuffer();  // may throw InsufficientDataException or IOException
    }

    // pseudo method MessageBuffer.get(MessageBuffer, srcOffset, dstOffset, size)
    // copies data from the receiver to the given buffer at most size bytes.
    int n = buffer.get(extraBufferFilledSize, position, extraBufferFilledSize, 4 - extraBufferFilledSize);
    extraBufferFilledSize += n;

    return readInt();  // tail call will be optimized by compiler
}

To make unpackShort repeatable, Unpacker needs 1 (head) + 8 (extraBuffer) bytes buffer (as commented in the code). This buffer can be one MessageBuffer as you suggest (9-byte MessageBuffer). 9-byte buffer (actually 13-byte because it needs to store size as well later) also works but byte + MessageBuffer is easier to implement, I guess.

We can optimize readInt as following by the way:

// this method should be inlined by JIT
private int readInt() throws IOException {
    // requireBuffer() guarantees that buffer.size() == position if extraBufferFilledSize > 0.
    // requireBuffer() guarantees that buffer can't be null (use private static final EMPTY_BUFFER = MessageBuffer.wrap(new byte[0])) so that here doesn't have to check buffer != null.
    if (buffer.size() - position >= 4) {
        int value = buffer.getInt(position);
        position += 4;
        return value;
    } else {
        return readIntOverBufferBoundary();
    }
}

// this method including tail call doesn't have to be inlined because this is rarely called.
private int readIntOverBufferBoundary() throws IOException {
    if (extraBufferFilledSize == 4) {
        int value = extraBuffer.getInt(0);
        extraBufferFilledSize = 0;
        return value;
    }

    // fill extraBuffer from buffer...
    if (buffer.size() <= position) {
        // buffer is empty
        requireNextBuffer();  // may throw InsufficientDataException or IOException
    }

    //// optional: possibly improve performance
    //if(extraBufferFilledSize == 0 && buffer.size() >= 4) {
    //    int v = buffer.getInt(position);
    //    position += 4;
    //    return v;
    //}

    // pseudo method MessageBuffer.get(MessageBuffer, srcOffset, dstOffset, size)
    // copies data to the given buffer at most given size bytes.
    int n = buffer.get(extraBufferFilledSize, position, extraBufferFilledSize, 4 - extraBufferFilledSize);
    position += n;
    extraBufferFilledSize += n;

    return readIntOverBufferBoundary();  // tail call
}

Another example is unpackArrayHeader:

public int unpackArrayHeader() throws IOException {
    final byte b = getHeadByte();
    if (Code.isFixedArray(b)) {
        this.head = READ_NEXT_HEAD_BYTE;
        return b & 0x0f;
    }
    switch (b) {
    case Code.ARRAY32:
        int u32 = readInt();  // may throw InsufficientDataException
        // readInt succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        if (u32 < 0) {
            throw overflowU32Size(u32);  // throws MessageFormatException (not retryable) but never throws InsufficientDataException (retryable)
        }
        return u32;
    // ...
    }
}

Unpacker doesn't need to store size to make unpackArrayHeader, unapckStringHeader or unpackBinaryHeader repeateble. size is necessary to make unpackExtendedTypeHeader, unpackString, and unapckRawString repeatable. Here is an example implementation of unpackExtendedTypeHeader:

public ExtendedTypeHeader unpackExtendedTypeHeader() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.EXT32:
        int extSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        byte extType = readByte();  // may throw InsufficientDataException (retryable)
        // readByte succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        this.size = READ_NEXT_SIZE;
        return new ExtendedTypeHeader(extType, extSize);
    // ...
    }
}

private int getNextSize32() throws IOException {
    if(size != READ_NEXT_SIZE) {
        return size;
    }

    int u32 = readInt();  // may throw InsufficientDataException (retryable)
    if(u32 < 0) {
        throw overflowU32Size(u32);
    }

    this.size = u32;
}

unpackRawString also needs size but more tricky than unpackExtendedTypeHeader because it needs to read variable-length data. There're 2 possible implementations as I suggested before:

Here is an example implementation of idea 1):

private RawString readingRawString = null;  // RawString might be MessageBuffer or byte[]
private int readingRawStringSize;

public RawString unpackRawString() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.STR32:
        int strSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        if(readingRawString == null) {
            // Perhaps users might use a custom MessageBuffer allocator here.
            // Then unpackRawString() will be unpackRawString(BufferAllocator)
            readingRawString = MessageBuffer.allocate(size);
            readingRawStringSize = 0;
        }
        String value = readRawStringCopy(strSize);
        // readRawStringCopy succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        this.size = READ_NEXT_SIZE;
        return value;
    // ...
    }
}

private RawString readRawStringCopy(int size) throws IOException {
    int remaining = size - readingRawStringSize;

    // pseudo method readPayloadAtMost(size, RawString) reads at most given size bytes from
    // underlaying buffer or input stream to RawString.
    int readBytes = readPayloadAtMost(remaining, readingRawString);
    if(readBytes >= remaining) {
        // completed
        // This is the fast-path. We might be able to create a optimized method like
        // readInt() + readIntOverBufferBoundary().
        RawString v = readingRawString;
        readingRawString = null;
        return v;
    }
    readingRawStringSize += readBytes;

    // now buffer is empty

    requireNextBuffer();  // may throw InsufficientDataException (retryable)

    return readRawStringCopy(size);  // tail call
}

Zero-copy (unpackRawStringReference) will be like this:

public RawStringReference unpackRawStringReference() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.STR32:
        int strSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        if(readingRawString == null && buffer.size() - position >= strSize) {
            RawStringReference v = new RawStringReference(buffer.slice(position, strSize));
            position += strSize;
            return v;
        }
        return unpackRawStringCopy();  // this is valid because getHeadByte and getNextSize32 are repeatable
    // ...
    }
}

unpackString can have different buffering mechanism:

public String unpackString() throws IOException {
    final byte b = getHeadByte();
    switch (b) {
    case Code.STR32:
        int strSize = getNextSize32();  // may throw InsufficientDataException (retryable)
        if(decodingStringBuffer == null) {
            decodingStringBuffer = new StringBuilder();
            decodingStringByteSize = 0;
        }
        String value = readString(strSize);
        // readString succeeded. Don't have to be retryable any more
        this.head = READ_NEXT_HEAD_BYTE;
        this.size = READ_NEXT_SIZE;
        return value;
    // ...
    }
}

private StringBuilder decodingStringBuffer = null;
private int decodingStringByteSize;

private String readString(int size) throws IOException {
    int remaining = size - decodingStringByteSize;

    // pseudo method readPayloadAtMostAndDecodeUtf8(MessageBuffer, size, StringBuilder) reads at most given size bytes from
    // MessageBuffer and decode it to StringBuilder.
    int decodedBytes = readPayloadAtMostAndDecodeUtf8(buffer, remaining, decodingStringBuffer);
    if(decodedBytes >= remaining) {
        // decoding completed
        String v = decodingStringBuffer.toString();
        decodingStringBuffer = null;
        return v;
    }
    decodingStringByteSize += decodedBytes;

    // now buffer is empty

    requireNextBuffer();  // may throw InsufficientDataException (retryable)

    return readString(size);  // tail call
}

If Unpacker supports unpackExtendedType (reads ExtendedTypeHeader and payload), Unpacker needs to store type as well as head and size.

frsyuki commented 10 years ago

My concept is:

Generally, getXxx 0 or more times -> readXxx is valid. In other words, unpackXxx methods call readXxx at the end. So we can create following methods (like: readIntAndResetHeadByte():

int readIntAndResetHeadByte() {
    int v = readInt();
    headByte = READ_NEXT_HEAD_BYTE;
    return v;
}
okumin commented 8 years ago

Do you have a plan to release this feature?

I'm developing a library that consumes msgpack streams(Fluentd's forward requests) but I have noticed that ChannelBufferInput doesn't work well on non-blocking IO. I want stream deserializer too much……