leeoniya / uDSV

A faster CSV parser in 5KB (min)
MIT License
669 stars 14 forks source link

Crashing when adding to transform stream #10

Open 1mike12 opened 7 months ago

1mike12 commented 7 months ago

I'm no stream expert, but given the examples, it seemed like it would be super easy to create a transform stream with your module.

I am able to verify that it works fine when running like the example in the readme, but when I port it into a custom Transform stream, it ends super early, doesn't hit the writeable, and I get no errors. There's a good chance this isn't a problem with uDSV, but it's strange, given your apis, this should work.


// lines = 16_182_971 , 100% finished in 13.1 seconds = 1_235_341 rows per second
// 597.5 MB  13.1s   45.5MB/s
const streamedCSV = async () => {
  const fileSize = await fs.promises.stat(filePath).then((stats) => stats.size)
  const stream = fs.createReadStream(filePath).pipe(new ProgressBarStream(fileSize))

  let parser: Parser;
  for await (const buffer of stream) {
    const strChunk = buffer.toString();
    parser ??= initParser(inferSchema(strChunk));
    parser.chunk<string[]>(strChunk, parser.typedArrs, (batch) => {
      const x = batch // reaches here fine
    });
  }
  parser!.end();
}

const transform = async () => {
  const fileSize = await fs.promises.stat(filePath).then((stats) => stats.size)
  return new Promise<void>((resolve, reject) => {
    pipeline(
      fs.createReadStream(filePath),
      new SimpleUDSVTransform(),
      new Writable({
        write(chunk, encoding, callback) {
          const x = chunk // never reaches here
          callback()
        }
      }),
      (err) => {
        if (err) {
          reject(err) // never reaches here
        } else {
          resolve() // never reaches here
        }
      }
    )
  })
}
(async () => {
  await streamedCSV() //works
  await transform() //breaks
})()
leeoniya commented 7 months ago

I'm no stream expert

heh, me neither :sweat_smile:. uDSV's api is fully synchronous, so there shouldn't be any surprises there.

does this reproduce with something you can attach here and i can debug (that hopefully isnt 16M records)?

1mike12 commented 7 months ago

yeah for sure, I just created a example repo you can use to run everything. just run example.ts I plopped in some random csv I found online, but really, any CSV will work to repro this issue it seems like

https://github.com/1mike12/usdv-stream-example

But regarding the api, I think the way to get it to work with streams is to use the chunk function, which seems to be async right now. I imagine it should be able to be synchronous though too though, since there shouldn't be a reason for the parser to not be able to give us what it's already got so far in a sync way?

this.parser.chunk<string[]>(strChunk, this.parser.stringArrs, (parsedData) => {
      this.push(parsedData);
      callback()
    });

https://github.com/1mike12/usdv-stream-example/blob/d64f3165174248e647b57853dbda4d50ac498719/src/streams/SimpleUDSVTransform.ts#L17

But either way, whether its sync or async, it really doesn't matter to a Transform stream, which is why I'm so confused why it just stops and ends so soon

You should see something like

██████████████████████████████ 100% 1.4 / 1.4 MB  0.0s   36.1MB/s
███░░░░░░░░░░░░░░░░░░░░░░░░░░░ 9% 128.0 / 1.4 MB  0.0s   62.5MB/s
Process finished with exit code 0
leeoniya commented 7 months ago

just heads up, it might be week or so before i can get to testing this.