adaltas / node-stream-transform

Object transformations implementing the Node.js `stream.Transform` API
https://csv.js.org/transform/
49 stars 13 forks source link

Finishing stream before entirely reading the whole file content #17

Closed hbakhtiyor closed 7 years ago

hbakhtiyor commented 8 years ago

I'm not able to parse the whole remote file content, the size is about 200Mb, the stream is finishing, before parsing entirely. It parse only portion of data, not completely.

I think the problem with pause/resume stream.

e.g.

let transformer = transform(function(record, cb) {
  setTimeout(function() {
    // do some heavy job!
    cb();
  }, 5000);
});

let parser = this.csvParse({
  // ... 
});

// the data size is about 200Mb
request('http://example.com/bar.csv')
  .on('error', errorHandler);
  .pipe(parser)
  .on('error', errorHandler)
  .pipe(transformer)
  .on('finish', () => {
      console.log('finished, but not parse completely!');
  })
  .on('error', errorHandler)
  .on('data', (data) => {

  });
wdavidw commented 8 years ago

can u try the end event and let me know

hbakhtiyor commented 8 years ago

tested, the same

pravincar commented 8 years ago

@hbakhtiyor The way you have chained the .on method has added the finish event onto the request object. What you want is actually

transformer.on('finish',function(){
   console.log('this should work');
});
wdavidw commented 8 years ago

@pravincar the usage of pipe is correct, it returns the destination stream.

wdavidw commented 8 years ago

You're usage of transform is wrong, the callback usage is:

However, it seems that you know that and that your probleme is more about not having the all dataset, specially the end of the data streamed, like if _flush wasn't called.

Could you confirm that you still reproduce the error after writing a correct transformer (you can adjust my below example) and after decreasing the timeout to "1"?

I tried to reproduce but it seems fine in my case:

fs = require 'fs'
transform = require 'stream-transform'
parse = require 'csv-parse'

errorHandler = (err) ->
  console.log 'ERROR', err

transformer = transform (record, cb) ->
  setTimeout ->
    cb null, record.join(',')+'\n'
    # cb() # print 0
  , 1

parser = parse()
count = 0

fs.createReadStream("#{__dirname}/data/10000000-lines.csv")
.on 'error', errorHandler
.pipe parser
.on 'error', errorHandler
.pipe transformer
.on 'end', ->
  console.log 'ended, got', count
.on 'finish', ->
  console.log 'finished, got', count
.on 'error', errorHandler
.on 'data', (data) ->
  count++
hbakhtiyor commented 8 years ago

cb(Error('Oh no!')) 1st argument is an error, stop here cb(null, data.join(',')+'whatever u want') 2nd argument is the transformed string or object cb() no argument, no generated data

yeah, i know, i just make mistake in the example, but in real app, passting to the right argument.

and after decreasing the timeout to "1"?

didn't test yet, surely it can pass, but in my case taking more times

thanks

wdavidw commented 8 years ago

i've reproduce sth close to your environnement, with a file of the same size running my above code, and it worked perfectly. unless you give me a script and a file reproducing the issue, i wont be able to look for a fix

hbakhtiyor commented 8 years ago

did you use with request module?

jonnochoo commented 8 years ago

I have a similar issue...

If I run this the console will only show the first 16 lines (should show 100)

var transform = require('stream-transform');

var records = [];
for(var i = 1; i <= 100; i++) {
  records.push({ index: i })
}

transform(records, function(data) {
    console.log(data); 
    return data; 
  })
  .on('finish', function() {
    console.log('done!');
  })

However if I add a few lines, it works fines and displays all 100 lines in the console

var transform = require('stream-transform');

var records = [];
for(var i = 1; i <= 100; i++) {
  records.push({ index: i })
}

transform(records, function(data) {
    console.log(data); 
    return data; 
  })
  .on('data', (data) => {
    // Adding this seems to make it work
  })
  .on('finish', function() {
    console.log('done!');
  })

Is that how it is meant to work? Or is there an issue?

mz3 commented 8 years ago

It looks to me like there is an issue when using an async transform with streams. Here is the simplest way to replicate the problem:

var path = require("path");
var fs = require("fs");
var CSV = require("csv");

var count = 0;

// for each parsed line in csv
var transformer = CSV.transform(function(record, callback) {

  setTimeout(function() {
    console.log(++count);
    callback();
  }, 500);
});

// when finished
transformer.on("finish", function(err) {
  console.log("transformer finish. count is", count);
});

// start
fs.createReadStream(path.join(__dirname, "sample.csv")).pipe(CSV.parse({columns: true})).pipe(transformer);

Expected output:

1
2
3
4
5
6
7
transformer finish. count is 7

Actual output:

transformer finish. count is 0
1
2
3
4
5
6
7
wdavidw commented 8 years ago

try with the "consume" option set to "true", otherwise transform is not a to be consumed by a readable stream

mz3 commented 8 years ago

Setting consume: true doesn't seem to make a difference. What do you mean when you say "transform is not a to be consumed by a readable stream?"

wdavidw commented 8 years ago

I refer to the Node.js stream API. Transform is a stream.Transformer, meaning it is both reading data from a provider and writing data to a consumer. If you dont provide a consumer then you dont have any one to active the stream pump: producer.pipe(transformer).pipe(consumer)

mz3 commented 8 years ago

Ah, okay. Thanks for clarifying. I discovered that it will only work correctly if both of these conditions are met:

The "finish" event seems to fire prematurely, but the "end" event works correctly, only if { consume: true } is set.