ironSource / parquetjs

fully asynchronous, pure JavaScript implementation of the Parquet file format
MIT License
349 stars 176 forks source link

Memory Leaks? #60

Closed lynxaegon closed 4 years ago

lynxaegon commented 6 years ago

Hi,

I'm using elasticsearchJS to export a whole index from ES in batches of 4096. The whole tool uses about 500mb RAM while dumping ES index to parquet format. (nodeJS has 2GB memory limit set)

If i lower or increase the batch size (or randomly) it uses a lot of memory like 2-3GB and it gets killed. The quickest way to reproduce is to increase the batch size that it has to process. The generate parquet file, usually has ~5.4GB.

Is there anything i can do to debug this more?

Thanks!

P.S.: I'm using git+ssh://git@github.com/ironSource/parquetjs.git#1fa58b589d9b6451379f1558214e9ae751909596 as the parquetJS package.

asmuth commented 6 years ago

I assume you have already seen the section of the README section about setting the row group/buffer size? https://github.com/ironSource/parquetjs#buffering--row-group-size -- If you keep increasing the batch size an OOM error will eventually be the expected failure case.

The default batch size is 4096; is it possible that your records are on the order of ~500KB each? Have you tried lowering the parquet.js row group size (via setRowGroupSize) yet?

lynxaegon commented 6 years ago

I saw the section, but even with 1024 as a rowGroupSize it sometimes (very rarely) sends an OOM error. I think the records are ~10-25KB each. I will keep trying to change the rowGroupSize. Thanks

ku-s-h commented 5 years ago

I am facing a similar issue with my app. It writes incoming data to parquet, rotates the files every 1 minute and pushes to s3. The application start with ~400mb of memory and keeps on increasing, eventually crossing 10gb in a few hours. Each record is a small json and parquet file generated each minutes is ~40mb.

Parquet

var parquet = require('parquetjs');
var S3 = require('./s3-out');
var dir = 'parquet/';
var writer;
var out_file;
var interval;
var timeoutHandle = null;

function startTimeout() {
  stopTimeout();
  timeoutHandle = setTimeout(rotateFile, 60000);
  console.log("start timeout");
}

function stopTimeout() {
  clearTimeout(timeoutHandle);
  console.log("clear timeout");
}

var schema = new parquet.ParquetSchema({
    // schema
});

function getFileName() {
    var d = new Date();
    return `${process.env.POD_NAME}-${d.getHours()}-${d.getMinutes()}-${d.getSeconds()}.parquet`
}

async function setupWriter() {
    startTimeout();
    // interval = setInterval(rotateFile, 60000);
    out_file = getFileName();
    writer = await parquet.ParquetWriter.openFile(schema, dir + out_file);
    writer.setRowGroupSize(100);
    console.log("opened " + out_file);
}

async function writeToParquet(data) {
    if(data && data[0]) {
        for(var i =0; i < data.length; i++) {
            var time = data[i].at;
            if(time) {
                time = time.toString();
            }
            await writer.appendRow({
                // schema
            });
        }
    } else {
        console.log("Bad data to write to parquet");
    }
}

async function rotateFile() {
    const used = process.memoryUsage();
    for (let key in used) {
    console.log(`${key} ${Math.round(used[key] / 1024 / 1024 * 100) / 100} MB`);
    }
    var old_writer = writer;
    var old_out_file = out_file;
    out_file = getFileName();
    var new_writer = await parquet.ParquetWriter.openFile(schema, dir + out_file);
    new_writer.setRowGroupSize(100);
    console.log("opened " + out_file);
    writer = new_writer;
    try {
        await old_writer.close().then(function() {
        console.log("closed " + old_out_file);
        old_writer = null;
        var date = new Date();
        var s3_dir = 'logs/' + date.getFullYear() + '/' + (date.getMonth() + 1) + '/' + date.getDate() + '/'; 
        S3.pushToS3(dir, old_out_file, s3_dir + old_out_file, function() {
            startTimeout();
        });
        });
    } catch(err) {
        console.log("Writing error " + err);
        startTimeout();
    }
}

exports.setupWriter = setupWriter;
exports.writeToParquet = writeToParquet;

S3

var fs = require('fs');
var AWS = require('aws-sdk');
var s3;

function setupS3Client() {
    s3 = new AWS.S3({
        region: 'ap-south-1',
        maxRetries: 3
    })
}

function pushToS3(dir, filename, key, callback) {
    fs.readFile(dir + filename, function(err, data) {
        if(err) {
            console.log("file read error: " + err);
            callback();
        } else {
            var params = {
                Bucket: 'parquet',
                Key: key,
                Body: data
            }
            s3.upload(params, function(err, data) {
                if (err) {
                    console.log("Error uploading to s3 " + err);
                    callback();
                } else {
                    console.log("file uplaoded");
                    callback();
                }
            })
        }
    })
}

exports.setupS3Client = setupS3Client;
exports.pushToS3 = pushToS3;

Chaning the group size also does not alter this behaviour.

arnabguptadev commented 4 years ago

I think I hit this same issue. Will try to post a working sample to demonstrate, but here's what I did:

Debugging through the library it seems that if the flushing happens only inside the close method (here: https://github.com/ironSource/parquetjs/blob/master/lib/writer.js#L108)- you get everything fine and the smallest output.

But if - due to your row group size - it is triggered also in append (here: https://github.com/ironSource/parquetjs/blob/master/lib/writer.js#L96) then you end up with duplicate rows. For large amounts of rows that continues to build up till it blows memory.

I tried the following as a quick and dirty workaround and seems to work: I changed the above lines in writer.js to:

      let to_write = this.rowBuffer;
      let rowCount = this.rowBuffer.rowCount;
      this.rowBuffer.rowCount = 0;
      this.rowBuffer = {};
      to_write.rowCount = rowCount;
      await this.envelopeWriter.writeRowGroup(to_write);

With this it does seem to keep the count integrity in place.

I think having the await before resetting the buffer (this.rowBuffer = {};) is the issue.

Does this sound right?

Regards, Arnab.

asmuth commented 4 years ago

OK, thanks for figuring this out! I think I see what is going on here now. There is an undocumented assumption that appendRow is not called concurrently, i.e. that, at any given time, there is only one outstanding call to the method.

For now, the following workaround should solve the problem: Ensure that calls to appendRow are sequenced, i.e. only start a new call to appendRow once the previous one has returned. This should ideally be done by using the "await" keyword on any call to appendRow.

As for a long term solution, I am not enough of a JavaScript expert to tell if something needs to be fixed in parquetjs or not. In my eyes, our current behaviour is similar to the the behaviour of the "fs" module built into nodejs. E.g. if you call fs.appendFile concurrently the result would be equally undefined. However, we might at the least consider to add a better error message for users that hit this issue.

Also see the discussion in https://github.com/ironSource/parquetjs/pull/105

asmuth commented 4 years ago

Closing this as resolved.