Keyang / node-csvtojson

Blazing fast and Comprehensive CSV Parser for Node.JS / Browser / Command Line.
MIT License
2.02k stars 270 forks source link

Pausing a stream #135

Closed nicholaswmin closed 7 years ago

nicholaswmin commented 7 years ago

Pausing a stream() doesn't seem to have an effect - The parser proceeds reading the CSV as usual.

Take for example the following code:

const rs = fs.createReadStream(this.csvPath);
rs.pause();

let count = 0;
csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})

count is logged 200 times (equal to the amount of rows in the CSV) - I was expecting it not to log anything since the stream is paused before passing it over to fromStream()

Keyang commented 7 years ago

Hi, I dont understand why you want to pause the readstream while you passing into the converter. Generally you should pass into the converter when you need to convert the source.

In node.js when readstream is pipe into a downstream it is by default calling readstream.resume() so this will override readstream.pause() if you pass into csv().fromStream(rs) see here

you will need to change your code into following:

const rs = fs.createReadStream(this.csvPath);

let count = 0;
csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})
process.nextTick(()=>{
  rs.pause();
})

again, if you dont want parse the readstream that time, you just dont need call the parser.

~Keyang

nicholaswmin commented 7 years ago

I've got a use case where I need to process a huge (>2 million rows) CSV and insert it into a DB.

To do this without running into memory issues, I intend to process the CSV as a stream, pausing the stream every 10000 rows, inserting the rows in my DB and then resuming the stream.

nicholaswmin commented 7 years ago

Apologies, the example I gave above was a bit too cut down.

So this,

process.nextTick(()=>{
  rs.pause();
})

would pause the stream right before it starts.

What I want to do is something like this, where I pause and resume within the json event listener;

.on("json", (json) => {
  rows.push(json);
  // for every 10, 000 rows - pause the stream,
  // asynchronously save to DB, and then resume the stream
  if (rows.length % 10000 === 0) {
    rs.pause();
    this.saveToDb(rows, () => {
      rs.resume();
      rows = [];
    })
  }
})
Keyang commented 7 years ago

First throught on your case is that you should write your own stream.Writable class so you can pipe the result to it rather than doing the buffer / throttling yourself. e.g.

rs.pipe(csv()).pipe(yourWritable)

The whole point of node.js Stream is being transparent to developers for its details so that you wont need worry about when to pause and resume the upper stream.

Anyway, if you want to do the way you mentioned, it is possible as well. You can pause the readable stream every 10000 rows and resume it once they are processed.

On 22 January 2017 at 16:27, Nicholas Kyriakides notifications@github.com wrote:

I've got a use case where I need to process a large (>2 million rows) CSV and insert it into a DB.

To do this without running into memory issues, I intend to process the CSV as a stream, pausing the stream every 10000 rows, inserting the rows in my DB and then resuming the stream.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Keyang/node-csvtojson/issues/135#issuecomment-274341081, or mute the thread https://github.com/notifications/unsubscribe-auth/ABCs98D7QAEoXDHptS4b9e1t4Zy7wzpNks5rU4NzgaJpZM4LqJLl .

nicholaswmin commented 7 years ago

Anyway, if you want to do the way you mentioned, it is possible as well. You can pause the readable stream every 10000 rows and resume it once they are processed.

Thanks a million - calling rs.pause() from within .on("json") doesn't seem to have an effect - the stream just continues flowing

Keyang commented 7 years ago

Ok. I see what you mean. Again this is not correct way of using Node.js stream. The reason it is not working is downstream will resume upstream if it is "drained" Thus pause() upstream will be voided if downstream is draining data. Node.js has hidden these underline details for developers.

You can although call rs.unpipe() to stop the rs populating data and once processing finished pipe it back. so you can do somethign like below:

const rs = fs.createReadStream(this.csvPath);

let count = 0;
var csvParser=csv()
.fromStream(rs)
.on("json", (json) => {
rows.push(json);
  // for every 10, 000 rows - pause the stream - asynchronously save to DB,
and then resume the stream
  if (rows.length % 10000 === 0) {
    rs.unpipe();
    this.saveToDb(count, () => {
      rs.pipe(csvParser);
      rows = [];
    })
  }
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})

~Keyang

On 22 January 2017 at 16:57, Nicholas Kyriakides notifications@github.com wrote:

Anyway, if you want to do the way you mentioned, it is possible as well. You can pause the readable stream every 10000 rows and resume it once they are processed.

Thanks a million - calling rs.pause() from within the json event listener doesn't seem to have an effect.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Keyang/node-csvtojson/issues/135#issuecomment-274342976, or mute the thread https://github.com/notifications/unsubscribe-auth/ABCs97QuBy8sXtJejX9coP4Nrs5jOg6hks5rU4plgaJpZM4LqJLl .

nicholaswmin commented 7 years ago

Thanks,

unpipe seems to keep emitting json 4 times after it's called. Does it have an immediate effect?

It looks like when unpipe is called, some json events are still being emitted.

.on("json", (json) => {
  rows.push(json);
  console.log(rows.length);
  if (rows.length % 1000 === 0) {
    console.log("unpiping");
    rs.unpipe();
    this._insertEntries(db, rows, ()=> {
      rs.pipe(csvParser);
      rows = [];
    });
  }
})

And here's some console.log()'s

996
997
998
999
1000
unpiping
1001
1002
1003
1004
1
2
3
4
nicholaswmin commented 7 years ago

However, this works:

.on("json", (json) => {
  rows.push(json);
  if (rows.length % 1000 === 0) {
    rs.unpipe();
    // clear `rows` right after `unpipe`
    const entries = rows;
    rows = [];
    this._insertEntries(db, entries, ()=> {
      rs.pipe(csvParser);
    });
  }
})
Keyang commented 7 years ago

Hi, unpipe will only stop reading file. json event is emitted if there is a row of csv being parsed. As there are multiple lines of csv being read for each reading, json could still be emitted even there is no more file reading.

~Keyang

On 22 January 2017 at 18:03, Nicholas Kyriakides notifications@github.com wrote:

However, this works:

.on("json", (json) => { rows.push(json); if (rows.length % 1000 === 0) { rs.unpipe(); const entries = rows; rows = []; this._insertEntries(db, entries, ()=> { rs.pipe(csvParser); }); } })

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Keyang/node-csvtojson/issues/135#issuecomment-274347278, or mute the thread https://github.com/notifications/unsubscribe-auth/ABCs9zqFRoGzELvEkr0XwTiKt4H6AEuiks5rU5n0gaJpZM4LqJLl .

nicholaswmin commented 7 years ago

Got it - thanks.

Are there any plans to make "pausing" a bit more easy? I'm guessing that what I'm trying to do here is a common task.

I've got it working with your suggestions, but it feels a tad hackish.

Keyang commented 7 years ago

:D As mentioned before, this is not correct way of doing things.

Node.js has encapsulated this in its stream implementation and we should not need to do this.

Implement your own Writable implementation is the correct way of doing this. see https://nodejs.org/api/stream.html#stream_simplified_construction

here is an example of what you want to achieve:


var tmpArr=[];
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
  write: function(json, encoding,callback){
    tmpArr.push(json);
    if (tmpArr.length===10000){
      myDb.save(tmpArr,function(){
        tmpArr=[];
        callback();
      })
    }else{
      callback();
    }
  } ,
  objectMode:true
}))
.on('finish',function(){
  if (tmpArr.length>0){
    myDb.save(tmpArr,function(){
      tmpArr=[];
    })
  }
})

~Keyang

On 23 January 2017 at 14:41, Nicholas Kyriakides notifications@github.com wrote:

Got it - thanks.

Are there any plans to make "pausing" a bit more easy? I've got it working with your suggestions, but it feels a tad hackish.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Keyang/node-csvtojson/issues/135#issuecomment-274504943, or mute the thread https://github.com/notifications/unsubscribe-auth/ABCs93cHFZCjsymuKhGc-qvVE3Dgh5zmks5rVLwagaJpZM4LqJLl .

nicholaswmin commented 7 years ago

Nice - that looks much cleaner. Thanks again.