nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
107.45k stars 29.54k forks source link

Streams memory leak #8955

Closed devmetal closed 8 years ago

devmetal commented 8 years ago

Hi, i have a problems with stream. I try "streamify" some database selects. I created a pager. The pager job is simple, get some records then push in object mode to a stream. I will show the relevant parts of the debugger code, and the results. I hope this is not an issue just i miss something about streams.

My workflow is simple: [ {/result objects from database/}, {}, {} ] -> (Push objects to PassThrough) {}, {} -> (Create array rows) [ 'values', 'from', 'objects' ] -> (CSV Writer) -> File or http response

//This will create arrays from specified keys from object
const recordStream = (fields) => new Transform({
  readableObjectMode: true,
  writableObjectMode: true,

  transform(chunk, enc, done) {
    if (!chunk) {
      return this.end();
    }

    const record = [];
    for (const field of fields) {
      record.push(chunk[field]);
    }

    this.push(record);
    done();
  }
});

// this is a csv writer instance from https://www.npmjs.com/package/csv-write-stream
const csvStream = (headers) => csvWrite({ headers });

// For better maintanable code, i use generators with promises

/**
 * Based on this
 * https://www.promisejs.org/generators/
 */
const gen = function generatorAsync(iterator) {
  return function generatorHandler() {
    const internal = iterator.apply(this, arguments);

    function handle(result) {
      if (result.done) return Promise.resolve(result.value);

      return Promise.resolve(result.value).then(
        res => handle(internal.next(res)),
        err => handle(internal.throw(err))
      );
    };

    try {
      return handle(internal.next());
    } catch (e) {
      return Promise.reject(e);
    }
  }
}

// The pager get stream instance and push evry record to
const pager = gen(function* (stream) {
  let skip = 0;
  const query = 'SELECT FROM E SKIP :skip LIMIT :limit';
  let results = yield db.query(query, { params: { skip, limit: 5000 } });

  while (results && !!results.length) {
    for (const row of results)  {
      stream.push(row);
    }

    skip += results.length;
    results = yield db.query(query, { params: { skip, limit: 5000 } });
  }

  return stream.end();
});

const records = recordStream(fields);
const csv = csvStream(fields);
const through = new PassThrough({ objectMode: true });

through
  .pipe(records)
  .pipe(csv)
  .pipe(fs.createWriteStream('./out.csv'));

through.on('end', () => console.log('end'));

pager(through);

// Debug
setInterval(function () {
  var mem = process.memoryUsage();
  var fmt = v => (v / (1024 * 1024)).toFixed(0) + 'MB';
  console.log('RSS = ' + fmt(mem.rss), 'Heap = ' + fmt(mem.heapUsed));
}, 1000);

There is a lot of record in database 30.000+. But its run really fast. The problem is the leak. I runned the server via pm2 and i see in monitor the memory not released.

When i run this code the output always similar this:

RSS = 80MB Heap = 29MB
RSS = 92MB Heap = 50MB
RSS = 102MB Heap = 55MB
RSS = 108MB Heap = 60MB
RSS = 101MB Heap = 28MB
RSS = 101MB Heap = 41MB
end
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
...

If you want to, i can put the whole code here. Just some requires and database connection. I have to use orientdb with orientjs.

Thank you for help

bnoordhuis commented 8 years ago

Do you have a test case that doesn't depend on third-party modules or external resources like a database? In other words, a standalone test case that only uses built-in modules?

devmetal commented 8 years ago

@bnoordhuis

Hi, i created an example with only nodejs modules. Same output.

First i created a quick script (from documentaion) to generate json records to a file, for simulate some input.

function writeFile(writer, encoding, callback) {
  let i = 200000;
  write();
  function write() {
    var ok = true;
    do {
      i--;
      if (i === 0) {
        // last time!
        writer.write(`{"id":"${i}", "data":"randomcontent"}\n`, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(`{"id":"${i}", "data":"randomcontent"}\n`, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

Secound i wrote the script:

'use strict';

const fs = require('fs');
const stream = require('stream');
const readline = require('readline');
const Transform = stream.Transform;
const PassThrough = stream.PassThrough;

const jsonStream = () => new Transform({
  readableObjectMode: true,

  transform(chunk, enc, done) {
    if (!chunk) {
      this.push(null);
      return done();
    }

    const jsonString = chunk.toString();
    const json = JSON.parse(jsonString);
    this.push(json);
    done();
  }
});

const recordStream = (fields) => new Transform({
  readableObjectMode: false,
  writableObjectMode: true,

  transform(chunk, enc, done) {
    if (!chunk) {
      return this.end();
    }

    const record = [];
    for (const field of fields) {
      record.push(chunk[field]);
    }

    this.push(record.join(',') + '\n');
    done();
  }
});

setInterval(function () {
  var mem = process.memoryUsage();
  var fmt = v => (v / (1024 * 1024)).toFixed(0) + 'MB';
  console.log('RSS = ' + fmt(mem.rss), 'Heap = ' + fmt(mem.heapUsed));
}, 1000);

const os = fs.createWriteStream('./output');
const is = fs.createReadStream('./input');
const json = jsonStream();
const record = recordStream(['id', 'data']);
const through = new PassThrough();

const rl = readline.createInterface({
  input: is
});

rl.on('line', (line) => {
  through.push(line);
});

rl.on('close', () => {
  through.end();
});

through.pipe(json).pipe(record).pipe(os)
.on('end', () => {
  console.log('end');
})
.on('close', () => {
  console.log('close');
});

The result is same. With 200000 records not so dramatic, but with 500000 or 1000000 is so huge memory usage. The output with 200000 record:

RSS = 71MB Heap = 25MB
RSS = 74MB Heap = 21MB
RSS = 74MB Heap = 26MB
RSS = 75MB Heap = 31MB
RSS = 78MB Heap = 26MB
RSS = 78MB Heap = 31MB
RSS = 83MB Heap = 28MB
RSS = 84MB Heap = 31MB
RSS = 84MB Heap = 35MB
RSS = 84MB Heap = 37MB
RSS = 87MB Heap = 30MB
RSS = 87MB Heap = 36MB
RSS = 87MB Heap = 28MB
RSS = 87MB Heap = 36MB
RSS = 87MB Heap = 29MB
RSS = 88MB Heap = 39MB
RSS = 88MB Heap = 33MB
RSS = 88MB Heap = 29MB
RSS = 88MB Heap = 41MB
RSS = 88MB Heap = 40MB
RSS = 88MB Heap = 28MB
RSS = 88MB Heap = 41MB
close
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
RSS = 88MB Heap = 27MB
...

With 1000000 records:

RSS = 74MB Heap = 19MB
RSS = 75MB Heap = 26MB
RSS = 75MB Heap = 31MB
RSS = 78MB Heap = 27MB
RSS = 78MB Heap = 32MB
RSS = 82MB Heap = 29MB
RSS = 84MB Heap = 36MB
RSS = 84MB Heap = 31MB
RSS = 84MB Heap = 36MB
RSS = 87MB Heap = 30MB
RSS = 87MB Heap = 35MB
RSS = 87MB Heap = 40MB
RSS = 89MB Heap = 34MB
RSS = 90MB Heap = 39MB
...
RSS = 157MB Heap = 83MB
RSS = 157MB Heap = 85MB
RSS = 158MB Heap = 86MB
RSS = 158MB Heap = 88MB
RSS = 158MB Heap = 90MB
....

And still going. Can u tell me where i go wrong? Thank you :)

devmetal commented 8 years ago

@bnoordhuis

Sorry, i made a huge mistake. Actually right now i replaced the readline with my custom line be line transform stream:

class LineByLine extends Transform {
  constructor(options) {
    super(options);
    this.buff = '';
  }

  _transform(chunk, enc, done) {
    const chr = chunk.toString();
    let i = 0;

    while(i < chr.length) {
      if (chr[i] === '\n') {
        this.push(this.buff);
        this.buff = '';
      } else {
        this.buff += chr[i];
      }

      i++;
    }

    done();
  }
}

I think its not the best way to do this but its really fast. With 1000000 record its done by seconds. So sorry for this, but i think we have also a little leak. Here is the output with 1000000 record without readline:

RSS = 57MB Heap = 16MB
RSS = 60MB Heap = 17MB
RSS = 60MB Heap = 16MB
RSS = 60MB Heap = 11MB
RSS = 57MB Heap = 18MB
RSS = 57MB Heap = 7MB
RSS = 57MB Heap = 14MB
RSS = 58MB Heap = 6MB
RSS = 58MB Heap = 12MB
RSS = 58MB Heap = 14MB
RSS = 58MB Heap = 17MB
RSS = 58MB Heap = 10MB
close
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB
RSS = 59MB Heap = 12MB

What do you think? 2MB is important or its normal?

bnoordhuis commented 8 years ago

That doesn't strike me as very significant; it could very well be a garbage collector or memory fragmentation artifact. The litmus test for memory leaks normally is whether the process eventually dies with an out-of-memory error.

devmetal commented 8 years ago

Okay, i think we can close this. Without thrid party libraries its working well. So i have to find the leak in original code. Maybe i can throw out the csv writer. Do you think the PassThrough stream cause issues like this? Or the generators? Maybe its all on them :)

Have you any suggestion about find memory leaks?

bnoordhuis commented 8 years ago

You could try https://www.npmjs.com/package/heapdump or the V8 inspector in v6.3.0+.

devmetal commented 8 years ago

Thank you for help :)