fingltd / 4mc

4mc - splittable lz4 and zstd in hadoop/spark/flink
Other
108 stars 36 forks source link

FourMcOutputStream.compress doesn't handle incompressible data correctly #12

Closed advancedxy closed 8 years ago

advancedxy commented 8 years ago

Suppose input is a 4MB buffer and is incompressible. The corresponding compressed size would be lz4_compressBound(4MB) > 4MB.

FourMcOutputStream.compress() will trigger the Lz4Compressor to compress input. And returns 4MB in

        int len = compressor.compress(buffer, 0, buffer.length);

FourMcOutputStream detects that it should use uncompressed data directly. After getting the raw uncompressed data, there is still compressed data left in compressor.compressedDirectBuf, thus the compressor.finished() returns false. Which triggers another call of FourMcOutputStream.compress() in FourMcOutputStream.finish(). It will try to get uncompressed again, which results an BufferUnderflowException.

    @Override
    public void finish() throws IOException {
        if (!compressor.finished()) {
            compressor.finish();
            while (!compressor.finished()) {
                compress();
            }
        }
    }

A quick fix to this problem would be allocating a big enough buffer when creating FourMcOutputStream. When the buffer is big enough to hold all the compressed data, line 196

        int len = compressor.compress(buffer, 0, buffer.length);

in FourMcOutputStream will transfer all the data in compressedDirectBuf in to buffer, thus set the compressor.finish() to true.

Fix would be as followed in FourCodec.java:

    @Override
    public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException {
        int bufferPlusOverhead = Lz4Compressor.compressBound(FOURMC_MAX_BLOCK_SIZE);
        return new FourMcOutputStream(out, compressor, bufferPlusOverhead);
    }

If you believe this fix is ok to accept, I would send a pr later. However, I do think there should be a better way to solve this problem as we really don't need a buffer that's larger than 4MB.

carlomedas commented 8 years ago

Thanks for spotting this. Your workaround is smart but what about fixing it by a compressor reset() performed in that specific case when block is better uncompressed?

        if (compressor.getBytesRead() <= compressor.getBytesWritten()) {
            // write uncompressed data block
            byte[] uncompressed = ((Lz4Compressor) compressor).uncompressedBytes();
            rawWriteInt(uncompressed.length);
            int checksum = Lz4Compressor.xxhash32(uncompressed, 0, uncompressed.length, 0);
            rawWriteInt(checksum);
            out.write(uncompressed, 0, uncompressed.length);

            compressor.reset(); // reset compressor buffers
          }

Last line should work just fine. If you have such corner case reproducible, would you mind trying that?

advancedxy commented 8 years ago

Hi, when calling compressor.reset(), you also reset finish to false, which will also lead compressor.finished() to false. A call to compressor.finish() should also be made after compressor.reset();

My current fix: use maxInputSize instead of FOURMC_BLOCK_SIZE

// maxInputSize for feeding to compressor.
// Called MAX_INPUT_SIZE in other CompressorStream, change to maxInputSize for style check pass.
    final int maxInputSize;
// Use max input size instead of FourMcCodec.FOURMC_BLOCK_SIZE.
int overhead = Lz4Compressor.compressBound(bufferSize) - bufferSize;
maxInputSize = bufferSize - overhead;
       if (len + limlen > maxInputSize && limlen > 0) {
            finish();
            compressor.reset();
        }

        if (len > maxInputSize) {
            do {
                int bufLen = Math.min(len, maxInputSize);
                // It's essential that we make sure input length(bufLen) <= FOUR_BLOCK_SIZE. Otherwise, we will got
                // a wrong decision whether we use uncompressed data directly or not, which will also trigger a
                // BufferOverflow exception if we read too much uncompressed data.
                compressor.setInput(b, off, bufLen);
                finish();
                compressor.reset();
                off += bufLen;
                len -= bufLen;
            } while (len > 0);
            return;
        }

I will try your fix and report back.

advancedxy commented 8 years ago

And yes, this fixes the problem. You can use testCodec to reproduce this case

  if (compressor.getBytesRead() <= compressor.getBytesWritten()) {
            // write uncompressed data block
            byte[] uncompressed = ((Lz4Compressor) compressor).uncompressedBytes();
            rawWriteInt(uncompressed.length);
            int checksum = Lz4Compressor.xxhash32(uncompressed, 0, uncompressed.length, 0);
            rawWriteInt(checksum);
            out.write(uncompressed, 0, uncompressed.length);

            compressor.reset(); // reset compressor buffers
            compressor.finish(); // set compressor to be finished.
          }
carlomedas commented 8 years ago

Perfect, I'll commit our fix as well for upcoming release. Thanks again.

advancedxy commented 8 years ago

Ah, I forgot to mention there is another case in the code. createCompressor in FourMcCodec.java should use a fixed directBufferSize(4MB) rather than a user configurable size. (I already used a fixed size when testing)

If the configured block size is 1MB, there will be 4 calls of FourMcOutputStream.compress() for 4MB input. For incompressible data, the (compressor.getBytesRead() <= compressor.getBytesWritten) will always be false in the first 3 calls, writes compressed data larger than original data(unexpected). The last call will try to read all the input(4MB), which also raises a BufferUnderFlowException (The uncompressedBuf has only 1MB capacity, and bytesRead is 4MB).

And if the configured block size is near 4MB and incompressible and lz4_compressBound(configured_block_size) >= 4MB, then the fix we proposed above will lose (4MB-configured_block_size) incompressible data.

advancedxy commented 8 years ago

@carlomedas ping

carlomedas commented 8 years ago

OK so the fix to this would just be to remove the read from configuration and hardcode it to the max size of 4MB? Sounds reasonable because anyways that size is best for big-data. TBH I'm not able to understand 100% the issue you reported because it's long time I didn't look at that code, so doing a proper fix for this use case would require much longer time and introduce non trivial risk... That's why your idea of fix sounds very good. Let me know and I'll commit that as well for upcoming release of 4mc with embedded libraries). TIA.

advancedxy commented 8 years ago

OK so the fix to this would just be to remove the read from configuration and hardcode it to the max size of 4MB? Yes, that's the fix, createCompressor in FourMcCodec uses a hardcoded maximum 4MB size. It's indeed a corner case.

carlomedas commented 8 years ago

Done in 4mc 1.4.0