adaltas / node-csv

Full featured CSV parser with simple api and tested against large datasets.
https://csv.js.org
MIT License
4.05k stars 267 forks source link

Steady memory use increase when processing not so large files #437

Closed obones closed 2 months ago

obones commented 2 months ago

Hello,

I'm using csv-parse to process a CSV file and csv-stringify to create an output CSV file out of that process with the following pseudo code:

const parseOptions = { delimiter: ";", columns: true, from: recordIndex };
const fileParser = sourceFile.stream().pipe(csv_parse.parse(parseOptions));
const outputStringifyer = stringify(
    {
        header: (callIndex == 0),
        delimiter: outputDelimiter,
        columns: outputColumns,
        cast:
        {
            boolean:
                function(value)
                {
                    return "" + value;
                }
        }
    }
);

outputS3Options.Body = outputStringifyer;
let upload = new Upload(
    {
        client: s3,
        params: outputS3Options
    }
);

let recordIndex = 0;
for await (const record of fileParser)
{
    console.log(`Processing record ${recordIndex} - ` + JSON.stringify(process.memoryUsage()));

    validateRecord(record);

    const restResults = await queryRestApi(record);

    const outputRecord = processResults(restResults);

    outputStringifyer.write(outputRecord);

    if (mustCancel())
        break;

    recordIndex++;
}

outputStringifyer.end();
console.log("==> output ended, " + errors.length + " errors");

await upload.done();
console.log("==> content file written");

sourcefile is a dictionary entry from a zip file read with unzipper which conveniently gives a stream to work with.

Tracing the results from process.memoryUsage() I get the following graph: full horizontal axis is the record index

If I comment out the call to outputStringifyer.write(), then I get the following graph no_write

The memory increase is much smaller if at all present in that case.

The examples above are from a 10k lines file and with a 100k lines I reach the default memory limit in NodeJS 18 which then triggers this error: FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

Passing the --max-old-space-size=4096 option allows to process the 100k lines file just fine but I'm worried that this just pushes the limit further down the line.

I'm quite convinced there is a fundamental misunderstanding in my code but right now, it eludes me quite a bit. Maybe I would need to change the async iterator approach to a stream approach but I'm not sure how this would be done, let alone if it would allow me to abort in the middle like I do in the example above.

Thanks a lot for any suggestion.

wdavidw commented 2 months ago

I agree that changing --max-old-space-size will just push the problem further instead of fixing it. I doubt the library suffer from a memory but if you disagree, then please provide me with a sample script, without any external dependencies, that replicate the behavior. Just to confirm my expectation, I created a script that was running for 24h without any memory increase. You can run it yourself or use it as a source of inspiration.

obones commented 2 months ago

Thanks for the example, it's nice of you to have taken the time to create it. And clearly, I'm not suggesting there is an issue in the library, I'm quite convinced it comes from my own code.

The example is not using the for await construct and I'm wondering if the behavior I'm seeing could come from my use of the async iterator. I really need to be able to cancel at any time and I'm not sure how to do that by using the pipeline construct.

wdavidw commented 2 months ago

You can use my code and convert it to for await.

obones commented 2 months ago

Thanks for your patience, here is a test script that gives the following memory usage graph:

image

node_csv_mem.zip

I replaced the use of the stringifyer stream as a Body for s3 upload by a pipe to a noop stream and I'm still seeing the increase in memory use.

wdavidw commented 2 months ago

You really need to run the process for a long time, eg 10 000 000 records to make sure the memory is stable. It is normal for the memory to slightly ramp-up in the mean time.

obones commented 2 months ago

Hum, ok, running with a 1M record file over the course of a little over 8 hours gave me this:

image

As this is with the test script, I'll have to try again with my real script to see if this replicates, but as I said before this hints to an issue in my production code.

Thanks for your patience

wdavidw commented 2 months ago

Just to confirm, I created a similar script to yours in commit 9d622199995899b0b4760a1bfa402aac896b37d1 and run for some time. Memory is table, here are 3 memory dumps:

3071 records, usage: {
  rss: '57.05 MB -> Resident Set Size - total memory allocated for the process execution',
  heapTotal: '9.55 MB -> total size of the allocated heap',
  heapUsed: '6.13 MB -> actual memory used during the execution',
  external: '1.81 MB -> V8 external memory'
}
522597 records, usage: {
  rss: '63.27 MB -> Resident Set Size - total memory allocated for the process execution',
  heapTotal: '14.8 MB -> total size of the allocated heap',
  heapUsed: '11.16 MB -> actual memory used during the execution',
  external: '4.27 MB -> V8 external memory'
}
2175923 records, usage: {
  rss: '65.08 MB -> Resident Set Size - total memory allocated for the process execution',
  heapTotal: '15.05 MB -> total size of the allocated heap',
  heapUsed: '6.43 MB -> actual memory used during the execution',
  external: '1.91 MB -> V8 external memory'
}
wdavidw commented 2 months ago

Maybe I would need to change the async iterator approach to a stream approach but I'm not sure how this would be done,

Look at bec3f12072f47f2eaf7f67b25d081d292c36de26, It insert a transformer to stop the pipeline once a certain number of processed records is reached.

obones commented 2 months ago

At last I found the issue! As I suspected, it's got nothing to do with your library which behaves as expected in buffering the content it receives because nothing is consuming it at the other end of the pipe. In my production code, this gives this: before

The difference with the test script that we exchanged about was the use of the stringifyer instance as the Body of an S3 transfer managed by an instance of the Upload class found in the @aws-sdk/lib-storage package, like this:

outputS3Options.Body = outputStringifyer;
let upload = new Upload(
    {
        client: s3,
        params: outputS3Options
    }
);

Following their example, I was calling await upload.done() at the very end of the loop. But contrary to what the method name led me to believe, the Upload class does not start processing data until done() has been called. So the solution was to call it before the loop, store its result into a variable and only call await on that promise after the loop has finished. When doing this, the memory usage is much more expected: after

In the end, if I had written my code with a "pipe only" approach instead of the "async operator", I would probably not have stumbled onto that behavior of the Upload class because production would only have started when done() starts doing its work.

Many thanks for your help and patience.

wdavidw commented 2 months ago

Happy to learn about the root cause, closing.

kelvinwop commented 2 months ago

2 million lines results in 50GB memory usage...

wdavidw commented 2 months ago

This is Javascript. Memory usage is the head size, around 10/20GB, 50GB is more about the allocated/reserved memory. It include the Node.js runtime and the buffer whose size is configurable. The parser in itself consume no memory at all.

kelvinwop commented 2 months ago

I used

const parser = fs.createReadStream('file.txt').pipe(parse({
  delimiter: '\t',
  skip_records_with_error: true,
}));

I fixed it by using the readline module to process the file in batches of 10000

  const fileStream = fs.createReadStream('file.txt');

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });

  let lines = new Array<string>();
  for await (const line of rl) {
    lines.push(line);
    if (lines.length > 10000) {
      processLines(lines);
      lines = [];
    }
  }

now the memory issue is gone

maybe worth adding as a feature if the detected file size is too large?

wdavidw commented 2 months ago

Detecting a large file size is out of scope with the library.