mcollina / split2

Split Streams3 style
ISC License
280 stars 24 forks source link

exception thrown within the mapper does not catch by stream on error mechanism #38

Open raz-viber opened 4 years ago

raz-viber commented 4 years ago

example:

pipeline(
            this.s3StreamReader.openStream(this.bucket, key),
            splitLine(line => {
                try {
                   return JSON.parse(line.toString());
                } catch (error) {
                    throw new TypeError(`line: "${line}" is not in json format`);
                }
            }),
            (error?) => {
                if (!error) {
                    this.logger.debug(`done streaming file: '${this.bucket}/${key}'`);
                }
                else {
                    this.logger.error(`error streaming file: '${this.bucket}/${key}', ${error}`);
                }
            }
        ).on("error" (err) => {
this.logger("WILL NEVER BE CALLED")
});

the on error will never be called.

fix suggestion to the transform function:

function transform (chunk, enc, cb) {
  var list
  if (this.overflow) { // Line buffer is full. Skip to start of next line.
    var buf = this[kDecoder].write(chunk)
    list = buf.split(this.matcher)

    if (list.length === 1) return cb() // Line ending not found. Discard entire chunk.

    // Line ending found. Discard trailing fragment of previous line and reset overflow state.
    list.shift()
    this.overflow = false
  } else {
    this[kLast] += this[kDecoder].write(chunk)
    list = this[kLast].split(this.matcher)
  }

  this[kLast] = list.pop()

  for (var i = 0; i < list.length; i++) {
  try{
    push(this, this.mapper(list[i]));
  } catch(error)
    cb(error)
  }

  this.overflow = this[kLast].length > this.maxLength
  if (this.overflow && !this.skipOverflow) return cb(new Error('maximum buffer reached'))

  cb()
}

basically wrapping the call with try/catch, on catch cb(error)

mcollina commented 4 years ago

Would you like to send a Pull Request to address this issue? Remember to add unit tests.