pierrec / node-lz4

LZ4 fast compression algorithm for NodeJS
MIT License
438 stars 98 forks source link

uncaught exception when using the synchronous "decode" function #117

Open williamstein opened 10 months ago

williamstein commented 10 months ago

I'm using nodejs.

  1. Create pretty much any little lz4 compressed file a.lz4
  2. Using vim or whatever, delete the last few characters of the file.
  3. Try to decompress it with decode, and it throws an Uncaught Error, as you can see below.

Unfortunately, this makes decode unusable in certain robust applications. The underlying code I think forgets to listen to an error event.

> try { require('lz4').decode(require('fs').readFileSync('a.lz4')) } catch(err) { console.log("oops")}
Uncaught Error: Unexpected end of LZ4 stream @0
    at Decoder.emit_Error (/projects/6659c2e3-ff5e-4bb4-9a43-8830aa951282/websocketfs/node_modules/.pnpm/lz4@0.6.5/node_modules/lz4/lib/decoder_stream.js:64:22)
    at Decoder.check_Size (/projects/6659c2e3-ff5e-4bb4-9a43-8830aa951282/websocketfs/node_modules/.pnpm/lz4@0.6.5/node_modules/lz4/lib/decoder_stream.js:70:32)
    at Decoder.read_DataBlockData (/projects/6659c2e3-ff5e-4bb4-9a43-8830aa951282/websocketfs/node_modules/.pnpm/lz4@0.6.5/node_modules/lz4/lib/decoder_stream.js:213:12)
    at Decoder._main (/projects/6659c2e3-ff5e-4bb4-9a43-8830aa951282/websocketfs/node_modules/.pnpm/lz4@0.6.5/node_modules/lz4/lib/decoder_stream.js:310:25)
    at Decoder._flush (/projects/6659c2e3-ff5e-4bb4-9a43-8830aa951282/websocketfs/node_modules/.pnpm/lz4@0.6.5/node_modules/lz4/lib/decoder_stream.js:280:7)
    at Decoder.final [as _final] (node:internal/streams/transform:132:10)
    at callFinal (node:internal/streams/writable:698:12)
    at prefinish (node:internal/streams/writable:710:7)
    at finishMaybe (node:internal/streams/writable:720:5)
    at Writable.end (node:internal/streams/writable:634:5) {
  domainEmitter: Decoder {
    _readableState: ReadableState {
      objectMode: false,
      highWaterMark: 16384,
      buffer: BufferList { head: null, tail: null, length: 0 },
      length: 0,
      pipes: [],
      flowing: true,
      ended: false,
      endEmitted: false,
      reading: false,
      constructed: true,
      sync: false,
      needReadable: false,
      emittedReadable: false,
      readableListening: false,
      resumeScheduled: true,
      errorEmitted: false,
      emitClose: true,
      autoDestroy: true,
      destroyed: false,
      errored: null,
      closed: false,
      closeEmitted: false,
      defaultEncoding: 'utf8',
      awaitDrainWriters: null,
      multiAwaitDrain: false,
      readingMore: false,
      dataEmitted: false,
      decoder: null,
      encoding: null,
      [Symbol(kPaused)]: false
    },
    _events: [Object: null prototype] {
      prefinish: [Function: prefinish],
      data: [Function (anonymous)]
    },
    _eventsCount: 2,
    _maxListeners: undefined,
    _writableState: WritableState {
      objectMode: false,
      highWaterMark: 16384,
      finalCalled: true,
      needDrain: false,
      ending: true,
      ended: false,
      finished: false,
      destroyed: false,
      decodeStrings: true,
      defaultEncoding: 'utf8',
      length: 0,
      writing: false,
      corked: 0,
      sync: true,
      bufferProcessing: false,
      onwrite: [Function: bound onwrite],
      writecb: null,
      writelen: 0,
      afterWriteTickInfo: [Object],
      buffered: [],
      bufferedIndex: 0,
      allBuffers: true,
      allNoop: true,
      pendingcb: 2,
      constructed: true,
      prefinished: false,
      errorEmitted: false,
      emitClose: true,
      autoDestroy: true,
      errored: null,
      closed: false,
      closeEmitted: false,
      [Symbol(kOnFinished)]: []
    },
    allowHalfOpen: true,
    options: {},
    binding: {
      compressBound: [Function (anonymous)],
      compress: [Function (anonymous)],
      compressLimited: [Function (anonymous)],
      compressHC: [Function (anonymous)],
      compressHCLimited: [Function (anonymous)],
      uncompress: [Function (anonymous)],
      uncompress_fast: [Function (anonymous)]
    },
    buffer: <Buffer f7 02 23 20 77 65 62 73 6f 63 6b 65 74 66 73 0a 0a 2a 2a 0f 00 f1 10 3a 2a 2a 20 6c 69 6b 65 20 73 73 68 66 73 2c 20 62 75 74 20 6f 76 65 72 20 61 20 ... 2685 more bytes>,
    pos: 0,
    descriptor: {
      blockIndependence: true,
      blockChecksum: false,
      blockMaxSize: 65536,
      streamSize: false,
      streamChecksum: true,
      dict: false,
      dictId: 0
    },
    state: 6,
    notEnoughData: true,
    descriptorStart: 4,
    streamSize: null,
    dictId: null,
    currentStreamChecksum: null,
    dataBlockSize: 3058,
    skippableSize: 0,
    [Symbol(kCapture)]: false,
    [Symbol(kCallback)]: null
  },
  domainThrown: false
}
> <Buffer >
williamstein commented 10 months ago

I used the following async code as a workaround, for anybody who sees this. This took me a while to work out, but does properly allow for dealing with error cases...


import { createDecoderStream, createEncoderStream } from "lz4";
import { createReadStream, createWriteStream } from "fs";
import { PassThrough } from "stream";

export async function readFileLz4(path: string): Promise<Buffer> {
  const decoder = createDecoderStream();
  const input = createReadStream(path);
  const output = new PassThrough();
  input.pipe(decoder).pipe(output);

  const chunks: Buffer[] = [];
  const waitForFinish = new Promise((resolve, reject) => {
    decoder.on("error", reject);
    output.on("finish", resolve);
    output.on("error", reject);
    output.on("data", (chunk) => {
      chunks.push(chunk);
    });
  });
  await waitForFinish;
  return Buffer.concat(chunks);
}

export async function writeFileLz4(path: string, contents: string) {
  // We use a stream instead of blocking in process for compression
  // because this code is likely to run in the project's daemon,
  // and blocking here would block interactive functionality such
  // as terminals.

  // Create readable stream from the input.
  const input = new Readable({
    read() {
      this.push(contents);
      this.push(null);
    },
  });
  // lz4 compression encoder
  const encoder = createEncoderStream();
  const output = createWriteStream(path);
  // start writing
  input.pipe(encoder).pipe(output);
  // wait until done
  const waitForFinish = new Promise((resolve, reject) => {
    encoder.on("error", reject);
    output.on("finish", resolve);
    output.on("error", reject);
  });
  await waitForFinish;
}