dominictarr / JSONStream

rawStream.pipe(JSONStream.parse()).pipe(streamOfObjects)
Other
1.91k stars 165 forks source link

append json object to stream #140

Closed matt212 closed 6 years ago

matt212 commented 7 years ago

Hi, i wanted to append below object to every object in stream

 {"index":{"_index":"tvseries","_type":"internindex"}}

my stream looks like this

[
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"01","content":"What?","season":"1"}
]

what my stream should look like !

-> POST http://localhost:9200/_bulk
  {"index":{"_index":"tvseries","_type":"internindex"}}
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"}
  {"index":{"_index":"tvseries","_type":"internindex"}}
  {"showname":"The X Files","episode":"04","content":"Great.","season":"1"}
  {"index":{"_index":"tvseries","_type":"internindex"}}
  {"showname":"The X Files","episode":"01","content":"What?","season":"1"}

how can i achieve this using jsonstream in my existing belowcodebase

var stream = new ElasticsearchWritableStream(client, {
  highWaterMark: 256,
  flushTimeout: 500
});
pg.connect(connectionString,function(err, client, done) {
  if(err) throw err;
  var query = new QueryStream('SELECT * FROM srt limit 2')
  var streams = client.query(query)

  //release the client when the stream is finished
  streams.on('end', done)
  streams.pipe(JSONStream.stringify()).pipe(stream)
})
Poetro commented 7 years ago

You'd need to do something like the following:

var Transform = require('stream').Transform

function transform(chunk, encoding, callback) {
  this.push({"index":{"_index":"tvseries","_type":"internindex"}})
  callback(null, chunk)
}

streams
  .pipe(new Transform({objectMode : true, transform: transform}))
  .pipe(JSONStream.stringify()).pipe(stream)

PS: code is not tested, just written to demonstrate the idea.

matt212 commented 7 years ago

thanks a lot @Poetro , sure will try this approach and provide an feedback accordingly !