Closed bparruck closed 7 years ago
You cannot create just one really big insert statement, that way you'll hit maximum statement length limit of the database.
So streaming doesn't really make sense in this case. You just have to generate batches of data and insert each batch as a separate insert query inside a transaction.
function(items, cb) {
console.log('Now writing to database', items.length, 'items');
var q = async.queue(function(item, cb) {
knex('ied_options').insert(item).then(function() {
cb();
}).catch(cb);
}, concurrency);
async.each(items, function(item, cb) {
q.push(item, function(err) {
cb(err);
});
}, function(err) {
cb(err);
});
}
const ied_options = db.table('ied_options', {fields: [ 'label', 'alias', 'created_date', 'updated_date' ]}) const ws = ied_options.createWriteStream()
ws.on('error', function (err) { console.log('Oh my!', err) })
ws.on('close', function () { console.log('Done!') })
ws.write(item)
Ummm... why are you using concurrency to write data to database? I mean that only way to achieve concurrency with inserting is to use multiple connections, which effectively doesn't allow you to use transactions so you can't rollback everything if one connection fails...
Anyways it seems that you would just like to trigger new insert on every data
event of your input stream (as far as I understood that is what streamsql write stream does)... So why not add data
event handler and do the insert query there and then if you would like to have output stream you can write meta
event there with inserted row info?
Sounds pretty easy to implement if you feel like that API is best for your use... but sorry knex
doesn't support that out-of-the-box...
I hope that some day we do implement rxjs based API instead of streams (or maybe both, since streams could be used for lower level implementation for rxjs API if they are supported better by the db drivers).
Thanks for the response. The script is used for importing data and I don't need to worry about rollback - it is either all or nothing. Yes, ideally I should rewrite the script to write into the DB everytime the data is created. The current logic assumes synchronous execution, which would have to be modified.
I thought if there was a way to implement a write stream I could avoid all that. Thanks for your comments.
I wanted to do this so I spent way too long making a duplex stream that does it. Before decided to make one, I ran across this thread in search of someone else's and figured I'd share it with you guys. Disclaimer: this is my first time implementing my own stream, so you might wanna look it over (criticism is welcome - I'm not very confident about if using a isCorked
flag was the right thing to do). Honestly wouldn't be a bad thing to have in the codebase.
knex.transaction(trx => {
getAReadableStream()
.pipe(new KnexBatchInsertStream({ trx, table: 'target_table', returnCols: ['id'], batchSize: 200 }))
.on('data', ({ id }) => console.log(id))
})
(returnCols
defaults to "*"
and batchSize
defaults to 4000
)
So it accepts some parameters and sets up a queue and listens to itself for the 'finish'
event. Upon receiving an object in the _write
function, it gets pushed onto the queue. The queue gets capped at (roughly) batchSize
via the cork
function. When the _read
function gets called, it goes ahead and splices size
elements off the front end (which apparently defaults to the highWaterMark == batchSize
) and sends them into batchInsert
(but not before checking if the freshly trimmed queue needs to be revitalized via uncork
). It then pauses the stream before pushing each result onto the stream (this is to bypass an idiosyncrasy when synchronously pushing a bunch of elements - I'm not sure, I read about it on stackoverflow and it fixed my problem). It resumes the stream and if the writable half of the duplex is done receiving writes and the queue is empty, it pushes null to end the readable stream.
const { Duplex } = require('stream')
class KnexBatchInsertStream extends Duplex {
constructor({ trx, table, returnCols = '*', batchSize = 4000 }) {
super({ objectMode: true, highWaterMark: batchSize })
this.trx = trx
this.table = table
this.returnCols = returnCols
this.batchSize = batchSize
this.isCorked = false
this.writeQueue = []
this.on('finish', () => {
this.writableFinished = true
})
}
_write(chunk, encoding, callback) {
this.writeQueue.push(chunk)
if (!this.isCorked && this.writeQueue.length >= this.batchSize) {
this.cork()
this.isCorked = true
}
callback(null)
}
async _read(size) {
const batch = this.writeQueue.splice(0, size)
if (this.isCorked && this.writeQueue.length < this.batchSize) {
this.uncork()
this.isCorked = false
}
const results = await this.trx
.batchInsert(this.table, batch)
.returning(this.returnCols)
.transacting(this.trx)
this.pause()
while (results.length && this.push(results.shift()));
this.resume()
if (this.writableFinished && !this.writeQueue.length) {
this.push(null)
return
}
}
}
module.exports = KnexBatchInsertStream
@timhuff I just wanted to thank you for this snippet, it saved me a ton of time. I think it absolutely makes sense to use a stream to manage backpressure for a situation where you're doing a ton of batch inserts. The implementation is fine imo, the flag seems a little messy but i don't feel any downsides to it. If not an addition to the library, maybe the api can just be cleaned up slightly and released as its own package / knex utility.
edit: when i did this sort of thing in the past, i used something like promise-task-queue
to manage the 'max batch insert concurrency' but there was still some custom pipe/pausing to handle the backpressure. This seems cleaner to me, at least until ReadStream having Symbol.asyncIterator is a thing.
@jkantr Thanks! I'm glad someone found it useful. I'm not sure about releasing it as a utility - I've got quite a bit on my plate and just don't have the time to maintain it. If you'd like to do so, though, feel free.
(and let me know if you find a cleaner way to handle this than the corking stuff)
@timhuff this.writableFinished is read-only, so it can not be assigned.
@glensc Yea, this was written in the days before I started using typescript. If you're able to fix it, please post the update for others.
I'm trying to write large number of items into a table. The items are generated over time. It would be great to open a write stream to the table and keep inserting items into the table as they get created. Currently, I'm inserting all the items at the end and it takes a log time to insert them all.