101arrowz / fflate

High performance (de)compression in an 8kB package
https://101arrowz.github.io/fflate
MIT License
2.27k stars 79 forks source link

Feature request: flush (Z_SYNC_FLUSH) during streaming deflate #41

Closed jasongin closed 3 years ago

jasongin commented 3 years ago

What can't you do right now? It would be helpful to be able to do a partial (sync) flush during a streaming deflate, such that all the data passed to the deflater so far can be fully inflated at the other end, without ending the stream or fully resetting the compression state. This will enable implementation of packet-level compression in a binary stream protocol.

Example code

const deflate = new fflate.Zlib();
deflate.ondata = (data) => writeDataToStream(data);

deflate.push(packet1);
deflate.flush();  // <-- proposed new API
// At this point the flushed data is written to the stream,
// and the other side can fully decompress all packets sent so far.

// Later, more data can be sent, continuing the compressed stream
// without resetting the compression state.
deflate.push(packet2);
deflate.flush();

// Meanwhile the inflater is on the other side of a stream connection,
// receiving a stream of compressed data from the deflater.
const inflate = new fflate.Inflate();
inflate.ondata = handleReceivedPacket(data);

let receivedData;
while (!!(receivedData = getNextChunkFromStream())) {
  deflate.push(receivedData);
  // The `ondata` callback should be invoked once reaching each flush point
  // (or earlier if there was a lot of data).
}

(How) is this done by other libraries?

The Node.js zlib API supports this in two ways:

  1. Set flush: zlib.constants.Z_SYNC_FLUSH in options for zlib.createDeflate(). Then it will auto-flush after every write.
  2. Or, explicitly call flush(zlib.constants.Z_SYNC_FLUSH) whenever desired after writing some data to the deflate stream.

Pako doesn't support this functionality as far as I can see. I don't know about other libraries.

101arrowz commented 3 years ago

In order to minimize code size, the compression streams consume as much data as possible after every push() call, unlike pako or zlib. This is equivalent to flushing after every push() in Zlib. As a result, ondata is always called at block boundaries within the DEFLATE format itself, and no internal state is needed within the compression streams.

Basically, what you're asking for is already done by default, and in fact you can't do it any other way. The practice saves execution time and bundle size at the slight cost of compressed data size (roughly 5 bytes are wasted after every push call). Let me know if you have any other questions.

P.S. It's important to note that unlike the decompression APIs in other libraries, fflate does not always consume all data that is received and will occasionally ignore 4 to 6 bytes at the end of the most recent chunk. If you'd like a flush() for the inflation streams (which would forcibly consume all of the data received so far), I might be able to do so.

jasongin commented 3 years ago

When deflating, I do get the full compressed data back via the ondata callback, for every call to push() on the fflate.Zlib instance.

However when inflating, what I observe is when when calling push() on the fflate.Inflate instance with a block of compressed data, the ondata callback is invoked with a zero-length data array. It does not return any decompressed data yet.

jasongin commented 3 years ago

Here's some sample code to demonstrate what I'm talking about. I tested it in Node.js v14.

const fflate = require('fflate');
const assert = require('assert');

const compressedPackets = [];
const decompressedPackets = [];

const deflate = new fflate.Zlib();
deflate.ondata = (data) => compressedPackets.push(Buffer.from(data));

const packet1 = Buffer.from('this is a test', 'utf8');
console.log(`input packet length = ${packet1.length}`);
deflate.push(packet1, false);
////deflate.flush();  // <-- proposed new API

console.log(`deflated packet count = ${compressedPackets.length}`);
assert(compressedPackets.length === 1);
console.log(`deflated packet length = ${compressedPackets[0].length}`);
assert(compressedPackets[0].length > 0);

const inflate = new fflate.Inflate();
inflate.ondata = (data) => decompressedPackets.push(Buffer.from(data));

inflate.push(compressedPackets[0], false);

console.log(`output packet count = ${decompressedPackets.length}`);
assert(decompressedPackets.length === 1);
console.log(`output packet length = ${decompressedPackets[0].length}`);
assert(decompressedPackets[0].length > 0); // <-- assertion fails

Output:

input packet length = 14
deflated packet count = 1
deflated packet length = 21
output packet count = 1
output packet length = 0
assert.js:383
    throw err;
    ^

AssertionError [ERR_ASSERTION]: The expression evaluated to a falsy value:

  assert(decompressedPackets[0].length > 0)
jasongin commented 3 years ago

If you'd like a flush() for the inflation streams (which would forcibly consume all of the data received so far), I might be able to do so.

I think that's what I'm asking for... though I'd expect that to be the default behavior unless there's a good reason it shouldn't. I was assuming the reason that wasn't happening was because there wasn't a flush on the deflate side, but that might have been a wrong assumption.

101arrowz commented 3 years ago

Alright, I can change the behavior of the inflation to automatically read as much as possible, but I'd like to warn you that zero-length arrays can only happen for very tiny buffers, at which point you shouldn't even call push() because push() will often waste 5 or more bytes and basically defeat the purpose of compressing that block. You should concatenate the input buffers until you reach a size you are comfortable with, then call push() if you are dealing with such small amounts of data.

Also, just as a BTW, your example actually works on the current version of fflate when replacing Inflate with Unzlib. Inflate is the wrong algorithm. Inflate for Deflate, Unzlib for Zlib, Gunzip for Gzip, Decompress for anything.

jasongin commented 3 years ago

I really appreciate all the help!

That test code is obviously contrived; in reality I am using larger input buffers. But regardless of size I need to be able to decompress all the complete chunks of data that were compressed so far, without ending the stream.

your example actually works on the current version of fflate when replacing Inflate with Unzlib.

Oh, I don't know how I missed that! I do see that my previous example test passes after fixing that. However I still find that in some cases unzlib passes a zero-length buffer to ondata. Here's an updated example:

const fflate = require('fflate');
const assert = require('assert');
const fetch = require('node-fetch');

const compressedPackets = [];
const decompressedPackets = [];

const deflate = new fflate.Zlib();
deflate.ondata = (data) => compressedPackets.push(Buffer.from(data));

const inflate = new fflate.Unzlib();
inflate.ondata = (data) => decompressedPackets.push(Buffer.from(data));

const testFile = 'https://raw.githubusercontent.com/101arrowz/fflate/9e442e99c5805270b0f96d3b18bf78685fff06d5/src/worker.ts';
fetch(testFile).then((response) => response.buffer()).then(test);

function test(input) {
    console.log(`input length = ${input.length}`);
    deflate.push(input, false);

    console.log(`compressed packet count = ${compressedPackets.length}`);
    assert(compressedPackets.length > 0);
    console.log(`first compressed packet length = ${compressedPackets[0].length}`);
    assert(compressedPackets[0].length > 0);

    compressedPackets.forEach((p) => inflate.push(p, false));

    console.log(`output packet count = ${decompressedPackets.length}`);
    assert(decompressedPackets.length > 0);
    const output = Buffer.concat(decompressedPackets);
    console.log(`output length = ${output.length}`);
    assert(output.length === input.length); // <-- assertion fails
}

Output:

input length = 399
compressed packet count = 1
first compressed packet length = 276
output packet count = 1
output length = 0
(node:19172) UnhandledPromiseRejectionWarning: AssertionError [ERR_ASSERTION]: The expression evaluated to a falsy value:

  assert(output.length === input.length)
101arrowz commented 3 years ago

400 bytes is still quite small of a chunk size, so you will still get zero-sized chunks. However, I have managed to fix it locally and will publish the version if performance isn't hurt too much.

input length = 399
compressed packet count = 1
first compressed packet length = 276
output packet count = 1
output length = 399
101arrowz commented 3 years ago

I've published version 0.6.4, which should resolve the issue where Inflate did not consume all available data. Let me know if you have any other questions. Thanks for the feature request!

jasongin commented 3 years ago

I can confirm that 0.6.4 resolved the issue. It's working great in all my testing so far. Thank you!