brianc / node-pg-query-stream

Query results from node-postgres as a readable (object) stream
MIT License
311 stars 37 forks source link

Piping same sqlstream into multiple writable streams vs creating multiple identical sqlstream #24

Closed radar155 closed 7 years ago

radar155 commented 7 years ago

I need to pipe the same sqlstream into multiple writable streams. I've read that you can do it (with a generic readable stream) but consider this simple code

var source = fs.createReadStream('source.txt');
var dest1 = fs.createWriteStream('dest1.txt');
var dest2 = fs.createWriteStream('dest2.txt');

source.pipe(dest1);
source.pipe(dest2);

This works. Also I read that if you pipe the same stream into multiple streams like here, the operations will be synchronized according to slowest pipeline. Furthermore, if i add a delay before piping to "dest2" it doesn't work anymore

source.pipe(dest1);
setTimeout(function(){source.pipe(dest2)},3000)

The second file results in an empty file.

SetTimeout here is simply simulating my flow: in fact I need to:

  1. Perform a query with this awesome lib
  2. Make async stuffs in my before piping source in some different writable streams
pg.connect(function(err, client, done) {
  if(err) throw err;
  var query = new QueryStream('SELECT * FROM hugetable')
  var stream = client.query(query)

  asyncStuffs(function(){
     stream.pipe(somedest)
  })
  asyncStuffs2(function(){
     stream.pipe(somedest2)
  })
  asyncStuffs3(function(){
     stream.pipe(somedest3)
  })
})

So my question is: is there a way to pipe the same sql stream into multiple writable streams without worryng about any inconsistencies caused by the async (no time deterministic) flow?

I think that this is not possible. If not, consider this solution:

pg.connect(function(err, client, done) {
  if(err) throw err;
  var streams = [];
  for (var i = 0; i < 3; i++) {
       var query = new QueryStream('SELECT * FROM hugetable')
       streams.push(client.query(query))
  }

  asyncStuffs(function(){
     streams[0].pipe(somedest)
  })
  asyncStuffs2(function(){
     streams[1].pipe(somedest2)
  })
  asyncStuffs3(function(){
     stream[2].pipe(somedest3)
  })
})

It's not so elegant but it should work. The question is: Can i have performance issue? It's pretty obvious that piping n times in parallel will slow down things but can we consider the code inside the for loop safe and efficient? Are "new QueryStream()" and "client.query()" heavy operations?

brianc commented 7 years ago

Hi @radar155!

So my question is: is there a way to pipe the same sql stream into multiple writable streams without worryng about any inconsistencies caused by the async (no time deterministic) flow?

I think so, yeah, but I'm not 100% sure off the top of my head how to do it. I too ran into some snags when trying to "fan out" one readable into multiple writable, but solved it by googling for streams that would do that kinda thing. try googling for pipe into multiple writable node node readable fan out node multi writable things like that. People have built a lot of great streams over the years.

Are "new QueryStream()" and "client.query()" heavy operations?

new QueryStream() is an extremely light-weight operation. Just allocates a new instance. The performance of client.query will depend on your query, but 99% of the work done there is within your database.

You could also try pg-cursor if you need more direct control over when rows are read out. This is what node-pg-query-stream uses under the hood.

Hope this helps! :clap:

radar155 commented 7 years ago

new QueryStream() is an extremely light-weight operation. Just allocates a new instance. The performance of client.query will depend on your query, but 99% of the work done there is within your database.

Maybe my question was not clear. client.query will start stress my db only when I pipe the readable stream that returns?

brianc commented 7 years ago

The query will be parsed, planned, and executed on the db creating a portal and cursor to read from as soon as you pass the query-stream to client.query - the cursor will not be iterated and rows wont be returned until you pipe the stream or start consuming it.