adaltas / node-stream-transform

Object transformations implementing the Node.js `stream.Transform` API
https://csv.js.org/transform/
49 stars 13 forks source link

Neat way of collecting output for writing as JSON to file? #23

Closed njh closed 5 years ago

njh commented 5 years ago

What is the neatest way of collecting all the rows together and writing them as JSON object to a file using the Stream API?

I am processing a small dataset (~70 rows and columns), so it all fits in memory fine, I just quite like the streaming semantics.

const fs = require('fs')
const parse = require('csv-parse')
const transform = require('stream-transform')

const output = fs.createWriteStream('output.json')
const input = `a,b,c,d
1,2,3,4
5,6,7,8
9,0,1,2
`

const parser = parse(input, {columns: true})

const filter = transform(function(data) {
  if (data.a == 1) return null
  return data
})

const collect = transform(function(data) {
  // Is there a way of not having this 'null' function?
  return data
}, function(err, data) {
  if (err) throw err
  let result = {
    "foo": "bar",
    "data": data
  }
  return JSON.stringify(result, null, 2)
})

parser.pipe(filter).pipe(collect).pipe(output)

I think the mistake I am making is that the result of the second function passed to transform() isn't what is passed down the pipe - but I am wondering what the neatest alternative is?

wdavidw commented 5 years ago

You could simply write collect as:

const collect = transform(function(data) {
  let result = {
    "foo": "bar",
    "data": data
  }
  return JSON.stringify(result, null, 2)
})
njh commented 5 years ago

That outputs a separate JSON object for each row of the CSV:

{
  "foo": "bar",
  "data": {
    "a": "5",
    "b": "6",
    "c": "7",
    "d": "8"
  }
}{
  "foo": "bar",
  "data": {
    "a": "9",
    "b": "0",
    "c": "1",
    "d": "2"
  }
}

I am trying to create:

{
  "foo": "bar",
  "data": [{
      "a": "5",
      "b": "6",
      "c": "7",
      "d": "8"
    },
    {
      "a": "9",
      "b": "0",
      "c": "1",
      "d": "2"
    }
  ]
}
wdavidw commented 5 years ago

OK, I got it. Transform is meant to handle records/messages of a stream and what you really need is a sink. In your sample, the second function is a callback. It present the advantage of being called at the end of your treatment but it cannot be used with pipe. You could remove the .pipe(output) and handle the output after stringifying the record.

const collect = transform(function(data) {
  // Is there a way of not having this 'null' function?
  return data
}, function(err, data) {
  if (err) throw err
  let result = {
    "foo": "bar",
    "data": data
  }
  output.write(JSON.stringify(result, null, 2))
  output.end()
})

parser.pipe(filter).pipe(collect)

Not very pretty. You will still need this "proxy" transform function and you have to handle the output inside the callback but it will work.

wdavidw commented 5 years ago

There is however a better way to do it, filter inside transform, see the documentation:

const parser = parse(input, {columns: true})

const collect = transform(function(data) {
  return data.a === 1 ? null : data
}, function(err, data) {
  if (err) throw err
  let result = {
    "foo": "bar",
    "data": data
  }
  output.write(JSON.stringify(result, null, 2)
  output.end()
})

parser.pipe(collect)

Or simply handle the filtering inside the parser callback:

parse(input, {columns: true}, function(err, records){
  const result = {
    "foo": "bar",
    "data": records.filter( record => record.a !== 1 )
  }
  output.write(JSON.stringify(result, null, 2)
  output.end()
})

Since you need to hold the overall dataset in memory at the end and since the input dataset is also in memory, this last example will work the same than the previous one unless you are reading a stream and have a lot of records being filtered, in which case you can use my previous example with stream-transform.

njh commented 5 years ago

Thank you very much for your speedy replies. I have got someone like you suggested working. It is good to confirm that I am not missing something.

I guess the final step is not really transforming the stream, I am collating it and writing it to disk.