ForbesLindesay / atdatabases

TypeScript clients for databases that prevent SQL Injection
https://www.atdatabases.org
MIT License
599 stars 47 forks source link

Pg queryNodeStream ignoring batchSize and highWaterMark #292

Open droganov opened 1 year ago

droganov commented 1 year ago

In:

const query = sql`
        SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })

for await (const chunk of stream)....

I expect chunk to be an array of 10 items, but instead it is just one item as a plain object instead of array

ForbesLindesay commented 1 year ago

Yes, this is the intended behaviour. batchSize and highWaterMark control the behaviour in the underlying db queries, so tuning the batch size and high water mark can improve throughput/performance, but the "chunks" returned in the stream are still individual records. If you need to batch them, you can write a separate utility that pulls a set number of items from the stream and emits that array as a chunk. Something like:

async function* batchStream<T>(source: AsyncIterator<T>, batchSize: number): AsyncIterator<T[]> {
  let batch = []
  for await (const record of source) {
    batch.push(record)
    if (batch.length === batchSize) {
      yield batch
      batch = []
    }
  }
  if (batch.length) yield batch
}

const query = sql`
        SELECT * FROM procedure(${collection}, ${channel}, ${merged})
`
const stream = db.queryNodeStream(query, { batchSize: 10, highWaterMark: 16 * 1024 })

for await (const chunk of batchStream(stream, 10))....

P.S. the batchSize is how many records to fetch at a time, the highWaterMark is how many records to allow the stream to buffer before applying back pressure (if you are consuming items slower than they are being returned form the database). 16 * 1024 would be a very large number of records to have as your high water mark if your batch size is only 10.

droganov commented 1 year ago

Hello @ForbesLindesay Thank you for the update, yes, that's how I solved it