101arrowz / fflate

High performance (de)compression in an 8kB package
https://101arrowz.github.io/fflate
MIT License
2.27k stars 79 forks source link

Streaming backpressure #48

Closed feross closed 3 years ago

feross commented 3 years ago

The streaming API seems to lack support for stream backpressure. Without backpressure, it's not possible to ensure that we're not pushing data to the stream too quickly for it to handle. If we push data too quickly and the fflate stream can't keep up, then it's possible that the data will be sitting around in memory for longer than we want.

Stream backpressure helps us to detect this condition and to stop calling push() until the backed up chunks have been processed.

The example toNativeStream code used on the website doesn't support backpressure. Is it possible to add support for this?

Or, at least expose the necessary information (i.e. outstanding chunk count, and a way to call a user-provided callback when the outstanding chunk count falls below a threshold so we can start pushing again)?

Separately, it might be helpful to just offer a WHATWG TransformStream interface to the user so they don't need to implement it themselves.

101arrowz commented 3 years ago

Thanks for the feature request!

I'm not very experienced with data streaming, but for compression, every ondata callback call corresponds directly to the completion of the compression of a single chunk that was pushed. (My rationale for this is a bit long, but I can elaborate if you'd like). Therefore, you could pull more chunks to the stream whenever ondata is called, and count the amount of chunks (or bytes) pushed and outputted manually to calculate when backpressure is below a certain threshold. If fflate offered more seamless integration, it would come at the cost of bundle size, which is high priority for this project.

As for the reason for not supporting TransformStream, I don't want to kill IE11 support (even though I probably should) for the sake of legacy projects. Same reason I don't use Promise anywhere. The entire codebase is ES3 with ES5 features, even without polyfills.

Of course, if you have a solution for either of these problems, I'd be happy to implement it.

101arrowz commented 3 years ago

With regards to the backpressure handling in the website demo, I'll implement that since you have requested (though I'm not exactly sure how to do so for pako. If you're willing to create a PR to handle pako backpressure, you can edit the stream-adapter.tsx file).

feross commented 3 years ago

Thanks! I think that counting the number of ondata callbacks will work perfectly for implementing backpressure.

jhiesey commented 3 years ago

Counting calls to push and ondata probably works fine for compressing using just a class like AsyncZipDeflate, but what about when each of those is added to a Zip output file?

Suppose I have something like this:


import { Zip, AsyncZipDeflate } from 'fflate'

const zip = new Zip()
zip.ondata = (err, chunk, final) => {
  // handle output data here
}

const file1 = new AsyncZipDeflate('file1.txt')
zip.add(file1)
// stream data into file1 here

const file2 = new AsyncZipDeflate('file2.txt')
zip.add(file2)
// stream data into file2 here

I have empirically found that the number of zip.ondata calls is the number of file.push calls, plus the number of files times two, plus one. But this seems very fragile and not a safe way of implementing backpressure.

Here is my full test code: https://gist.github.com/jhiesey/b4b076af249dac2056d3e46a6bdcb02a

Is there a better way of knowing when it's safe to push into AsyncZipDeflate when it's output is getting added to a Zip?

101arrowz commented 3 years ago

If you'd like, you can create a function that sneaks your own callback in the middle for the ZIP file wrappers (AsyncZipDeflate, ZipPassThrough, etc.).

const onBackpressure = (stream, cb) => {
  let backpressure = [];
  let backpressureBytes = 0;
  const push = stream.push;
  stream.push = (dat, final) => {
    backpressure.push(dat.length);
    cb(backpressureBytes += dat.length);
    push.call(stream, dat, final);
  }
  let ondata = stream.ondata;
  const ondataPatched = (err, dat, final) => {
    ondata.call(stream, err, dat, final);
    cb(backpressureBytes -= backpressure.shift());
  }
  if (ondata) {
    stream.ondata = ondataPatched;
  } else {
    // You can remove this condition if you make sure to
    // call zip.add(file) before calling onBackpressure
    Object.defineProperty(stream, 'ondata', {
      get: () => ondataPatched,
      set: cb => ondata = cb
    });
  }
}

Then, you can use this to watch for backpressure changes. Here's an example for Node.js (You can convert this to ReadableStream quite easily).

const { Zip, AsyncZipDeflate } = require('fflate');
const { createWriteStream, createReadStream } = require('fs');
const onBackpressure = (stream, cb) => { ... }
const writeStream = createWriteStream('out.zip');
const zip = new Zip((err, dat, final) => {
  writeStream.write(dat);
  if (final) writeStream.end();
});
const txt = createReadStream('test/data/text');

const file1 = new AsyncZipDeflate('ex.txt');
zip.add(file1);
const backpressureThreshold = 65536;
onBackpressure(file1, backpressure => {
  if (backpressure > backpressureThreshold) {
    txt.pause();
  } else if (txt.isPaused()) {
    txt.resume()
  }
});
txt.on('data', chunk => file1.push(chunk));
txt.on('end', () => file1.push(new Uint8Array(0), true));

zip.end();

I recognize that this looks like monkey-patching, but if you used a subclass for a specific ZIP file stream, you could do this in a cleaner way. The API for push and ondata is stable because both are user-facing, so you won't run into compatibility issues down the line using this function.

101arrowz commented 3 years ago

@jhiesey Did you have any issues with the backpressure code?

jhiesey commented 3 years ago

@101arrowz I haven't had a chance to look at it yet, sorry. I'll probably get back to this tomorrow.

jhiesey commented 3 years ago

@101arrowz I finally got back to this today. Thanks for being so responsive!

Your solution should work well if fflate's AsyncZipDeflate is the limiting factor in the stream flow rate. But it doesn't handle the situation where the final output stream (writeStream in your example) is slow and starts buffering data internally.

I think a slight variation on your solution will work in that case too. But it gets a little tricky. To make it very explicit, your example pipeline looks like this, from input to output:

A proper backpressure implementation needs to slow down the input if data is getting buffered anywhere in the chain. Your suggestion properly prevents unbounded amounts of data from getting buffered at the interface between file1 and zip, but it does not prevent data from being buffered inside writeStream in the case the output speed is the limiting factor.

To handle both, I need something like this (not thoroughly tested, but seems to work):

const { Zip, AsyncZipDeflate } = require('fflate');
const { createWriteStream, createReadStream } = require('fs');

const onBackpressure = (stream, outputStream, cb) => {
  const runCb = () => {
    // Pause if either output or internal backpressure should be applied
    cb(applyOutputBackpressure || backpressureBytes > backpressureThreshold);
  }

  // Internal backpressure (for when AsyncZipDeflate is slow)
  const backpressureThreshold = 65536;
  let backpressure = [];
  let backpressureBytes = 0;
  const push = stream.push;
  stream.push = (dat, final) => {
    backpressure.push(dat.length);
    backpressureBytes += dat.length;
    runCb();
    push.call(stream, dat, final);
  }
  let ondata = stream.ondata;
  const ondataPatched = (err, dat, final) => {
    ondata.call(stream, err, dat, final);
    backpressureBytes -= backpressure.shift();
    runCb();
  }
  if (ondata) {
    stream.ondata = ondataPatched;
  } else {
    // You can remove this condition if you make sure to
    // call zip.add(file) before calling onBackpressure
    Object.defineProperty(stream, 'ondata', {
      get: () => ondataPatched,
      set: cb => ondata = cb
    });
  }

  // Output backpressure (for when outputStream is slow)
  let applyOutputBackpressure = false
  const write = outputStream.write;
  outputStream.write = (data) => {
    const outputNotFull = write.call(outputStream, data);
    applyOutputBackpressure = !outputNotFull;
    runCb();
  }
  outputStream.on('drain', () => {
    applyOutputBackpressure = false;
    runCb();
  })
}

const writeStream = createWriteStream('out.zip');
const zip = new Zip((err, dat, final) => {
  writeStream.write(dat);
  if (final) writeStream.end();
});
const txt = createReadStream('test/data/text');

const file1 = new AsyncZipDeflate('ex.txt');
zip.add(file1);
const backpressureThreshold = 65536;
onBackpressure(file1, writeStream, shouldApplyBackpressure => {
  if (shouldApplyBackpressure) {
    txt.pause();
  } else if (txt.isPaused()) {
    txt.resume()
  }
});
txt.on('data', chunk => file1.push(chunk));
txt.on('end', () => file1.push(new Uint8Array(0), true));

zip.end();

I totally understand not wanting to embed this logic in your module for bundle size reasons (and because there are multiple types of streams to support), but it would be helpful to have a wrapper to handle it since it's a bit subtle and tricky to get right.

101arrowz commented 3 years ago

Yep, what you wrote looks about right for handling backpressure on the way out. You bring up a good point, it's not simple to use advanced features of streams with fflate, and you are correct that the catch-22 is that I can't commit to supporting just Node.js streams or just ReadableStream because there are so many more possible ways you may want to use the output of fflate than just those two.

I'll probably add a few more wiki entries regarding advanced usage, including backpressure support. I'll include your snippet, since it seems to work well from my tests. I may also create another package with this code (if you're alright with that) so it can be used more seamlessly. Thanks for your efforts!

101arrowz commented 8 months ago

A few years late but v0.8.2 now offers introspection details that can help implement backpressure on the async, non-ZIP APIs. A custom reimplementation of AsyncZipDeflate would likely be enough to make it work on a ZIP stream, but I haven't done it here for bundle size reasons.