brianc / node-pg-query-stream

Query results from node-postgres as a readable (object) stream
MIT License
311 stars 37 forks source link

multiple streams and stream needs to be piped to writable #38

Open eljefedelrodeodeljefe opened 6 years ago

eljefedelrodeodeljefe commented 6 years ago

Hi,

I am struggling to see, why the below test would not work. Maybe I get something wrong. But right now I assume there is something very quirky during the Stream Implementation happening. Maybe some vents won't be passed.

In any case the need to piping to a writable a to get the end event is somehow weird.

I am running on Node 8.

The use case here would be to have an array of query and recursively querying them. I am creating new instances of the client and streams wherever possible.

var helper = require('./helper')
var QueryStream = require('../')
var concat = require('concat-stream')
var pg = require('pg')

var Transform = require('stream').Transform

var mapper = new Transform({ objectMode: true })

mapper._transform = function (obj, enc, cb) {
  console.log('will see data a couple of times')

  this.push(obj)
  cb(null)
}

helper('mapper', function (client) {
  it('works', function (done) {
    exec(client, () => {
      // // commenting that in works
      // return done()
      const client1 = new pg.Client()

      client1.connect(() => {
        exec(client1, () => {
          done()
        })
      })
    })
  })
})

function exec (client, cb) {
  var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], { highWaterMark: 100, batchSize: 50 })
  stream.on('end', function () {
    // console.log('stream end')
    return cb()
  })
  client.query(stream)
  // stream.pipe(mapper).pipe(concat(function (res) {
  //   cb()
  // }))

  // lib actually needs to pipe to a writable stream
  stream.pipe(mapper)
}

any help appreciated.

jnikles commented 5 years ago

@eljefedelrodeodeljefe to my knowledge a transform stream is only an interim state in the streaming pipeline. The stream must be consumed somewhere for you to receive the end event. This could be either a piped writable or just a simple .on('data'..) listener. It's not a bug in this library