adaltas / node-csv

Full featured CSV parser with simple api and tested against large datasets.
https://csv.js.org
MIT License
3.98k stars 263 forks source link

`option.to` breaks the stream and exits the program #333

Open dnafication opened 2 years ago

dnafication commented 2 years ago

Describe the bug

Using csv-parse package to parse a readable stream. Basically this example from your documentation. Introducing the to or to_line option seems to exit the stream without executing anything after await finished(parser); in line 27 below:

import assert from 'assert'
import fs from 'fs'
import os from 'os'
import { parse } from 'csv-parse'
import { finished } from 'stream/promises'

// Prepare the dataset
await fs.promises.writeFile(
  `${os.tmpdir()}/input.csv`,
  ['a,b,c', '1,2,3', '3,4,5'].join('\n')
)
// Read and process the CSV file
const processFile = async () => {
  const records = []
  const parser = fs.createReadStream(`${os.tmpdir()}/input.csv`).pipe(
    parse({
      to: 2,   // <-- add this option
    })
  )
  parser.on('readable', function () {
    let record
    while ((record = parser.read()) !== null) {
      // Work with each record
      records.push(record)
    }
  })
  await finished(parser)
  // nothing gets executed after this
  console.log(records) // <-- this does not print anything
  return records
}
// Parse the CSV content
const records = await processFile()
console.log(records) // <-- this doesn't get executed

// Validate the records
assert.deepStrictEqual(records, [
  ['a', 'b', 'c'],
  ['1', '2', '3'],
])

To Reproduce

The example is provided in the description above.

Additional context

Add any other context about the problem here.

foghina commented 1 year ago

I'm having the same issue, here's my slightly shorter repro:

import * as fs from "node:fs";
import * as stream from "node:stream/promises";

import { parse } from "csv-parse";

const parser = fs.createReadStream("data.csv").pipe(
  parse({
    // to_line: 3
  })
);
parser.on("readable", () => {
  let record;
  while ((record = parser.read()) !== null) {
    console.log(record);
  }
});
console.info("waiting for stream to finish...");
await stream.finished(parser);
console.info("this won't get logged if you uncomment the to_line option.");
foghina commented 1 year ago

FWIW, parser.on('end', callback) still gets invoked correctly, so that's one potential way around this issue for now, but it's still quite blocking.

foghina commented 1 year ago

Follow-up from my previous comment, one way around it is adding:

parser.on('end', parser.destroy);

This will correctly resolve the promise and the code after await gets executed.

However, now you might get an error due to the stream being closed prematurely. I fixed it with:

await stream.finished(parser).catch((e) => {
  if (e && e.code === "ERR_STREAM_PREMATURE_CLOSE") {
    // 🔥 this is fine 🔥
  } else {
    console.error("Parser error", e);
  }
});

EDIT: See discussion below. Using parser.on("end", parser.end) instead avoids the premature close error completely.

wdavidw commented 1 year ago

I am looking at it as well, trying to read the official doc and extract some sense.

Using the more robust strategy of try-and-fail, I get to the same conclusion, destroy triggers stream.finished and it must be called after end to avoid ERR_STREAM_PREMATURE_CLOSE. Not sure why, nor about the correct way to handle this behavior.

wdavidw commented 1 year ago

Instead of

parser.on('end', parser.destroy);

Try

parser.end()
parser.on('end', parser.destroy);

This seems to prevent the ERR_STREAM_PREMATURE_CLOSE error in my test

wdavidw commented 1 year ago

It is also associated with the close event which is not thrown unless destroy is called or unless there is an error.

foghina commented 1 year ago

Calling parser.end() before parser.on() seems to end the parser before any records get processed though? Since the await stream.finished() has to come after both those lines.

foghina commented 1 year ago

OK, parser.on("end", parser.end) seems to fix the premature error quite nicely.

foghina commented 1 year ago

Thanks for the fix! Just for posterity of before a new release is made, I realized that using for await also seemingly fixes the problem (unsure if any dangling streams are left around though). My code is now:

const parser = fs.createReadStream("data.csv").pipe(
  parse({
    to_line: 2,
  })
);
for await (const record of parser) {
  console.log(record);
}
console.info("done.");

Which seems to work correctly without any errors/exceptions.

wdavidw commented 1 year ago

The all stream API is hard to grasp. I am not even sure whether the close event is expected to be called or not.

Anyway, new versions are published:

Yoshiji commented 1 year ago

I am updating my npm dependencies, and this change in https://github.com/adaltas/node-csv/commit/ca3f55b7cf556b45377677428783608a2d9ebbb2 is breaking my CSV parsing done client-side in the browser. It seems that parser.destroy is undefined, leading to a type error.

Here is a code sandbox with the error: https://codesandbox.io/s/jolly-matan-5y0er4?file=/src/index.js

My actual usage differs from this simple example. I am reading the first line of a file selected by the user to be uploaded, and I parse the CSV headers to ensure each are valid, something like this:

const csvParser = parse({ delimiter: ",", fromLine: 1, toLine: 1 }).on("data", onParseData).on("error", onParseOrReadError);
// csvParser.destroy = () => csvParser; // <=== adding this solves the TypeError
const fileSlice = file.slice(0, 1024 * 4); // reading the first 4kb just in case it contains many long headers
fileSlice.text().then((v) => {
  csvParser.write(v);
  csvParser.end();
}).catch(onParseOrReadError);
wdavidw commented 1 year ago

Can you go inside the source code of the parser and add a condition, for example

if (this.destroy !== undefined) {
   this.on('end', this.destroy);
}

I would also need sth in ./demo to replicate the issue.

wdavidw commented 9 months ago

I am re-opening this issue. I did some testing after issue #410.

Issue #333 is fixed with this.end(); this.destroy(); Issue #410 is fixed with this.end(); this.on('end', this.destroy);

  1. It happens that to fix #410 which uses async iterator, we need to partially break this issue which uses stream.finished.
  2. By fixing #410, we now have two very similar tests with different outcome. One use a readable stream implemented with csv-generate, it works. One use a readable stream implemented with stream.Readable, it breaks.
  3. The code that fixes issue #333 is no longer working with the latest Node.js version 21.4.0.

Here is the note I left inside the source code. I have the feeling that this could come from Node.js and be out of grasp. We welcome anyone who would stop by with the expertise to help us.

this.end()
// Fix #333 and break #410
//   ko: api.stream.iterator.coffee
//   ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate())
//   ko: api.stream.finished # aborted (with Readable)
// this.destroy()
// Fix #410 and partially break #333
//   ok: api.stream.iterator.coffee
//   ok: api.stream.finished # aborted (with generate())
//   broken: api.stream.finished # aborted (with Readable)
this.on('end', this.destroy);

I took the decision to fix issue #410, re-open this issue and release the patch version 5.5.3. Motivation for the patch SemVer is that this issue has not been fixed properly and was broken with the upcoming Node.js versions. I acknowledge this is a very controversial decision in term of SemVer.

sneko commented 1 week ago

@wdavidw I faced this too. Maybe enabling to options could log a warning until a solution is found? (I was looking first to find the issue in my code since chaining multiple transformers...).

wdavidw commented 1 week ago

I would rather update the documentation. Writing content to stdout or stderr would potentially break usage when people are expected to pipe the result from this library.