brianc / node-pg-copy-streams

COPY FROM / COPY TO for node-postgres. Stream from one database to another, and stuff.
331 stars 40 forks source link

Documentation doesn't work with latest `pg` #144

Closed k-funk closed 1 year ago

k-funk commented 1 year ago

This documentation does not work with the latest pg version. client.query now returns a promise, so there is now an error for TypeError: stream.on is not a function

var fs = require('fs')
var { Pool } = require('pg')
var copyFrom = require('pg-copy-streams').from

var pool = new Pool()

pool.connect(function (err, client, done) {
  var stream = client.query(copyFrom('COPY my_table FROM STDIN'))
  var fileStream = fs.createReadStream('some_file.tsv')
  fileStream.on('error', done)
  stream.on('error', done)
  stream.on('finish', done)
  fileStream.pipe(stream)
})

An example using await would be great.

jeromew commented 1 year ago

Hello,

thanks for you message. I must admit that I did not try the code that you posted, but this module is using the submittable interface of pg.query, which, except if I am making a mistake, does not return a promise.

cf https://github.com/brianc/node-postgres/blob/master/packages/pg/lib/client.js#L522

and cf "client.query with a Submittable" section on https://node-postgres.com/apis/client

So I am interested to know how you get a "TypeError: stream.on is not a function" because stream should indeed be a stream.

k-funk commented 1 year ago

Here's my script

import fs from 'fs'
import { Pool } from 'pg'
import { from as copyFrom } from 'pg-copy-streams'

export function makePool() {
  return new Pool({
    ssl: process.env.NODE_ENV !== 'development' ? { rejectUnauthorized: false } : false,
    connectionString: process.env.DATABASE_URL,
    database: 'codelookup',
  })
}
const pool = makePool()

function seed() {
  try {
    const stream = pool.query(copyFrom('COPY codes FROM STDIN'))
    const fileStream = fs.createReadStream('database_setup/result.txt')
    stream.on('error', () => {})
    stream.on('finish', () => {})
    fileStream.on('error', () => {})
    fileStream.pipe(stream)
  } catch (err) {
    console.error(err)
  }
}

seed()

CLI output

$ npm run seed2                                                                                                                                                                        
> codelookup@0.2.0 seed2
> NODE_ENV=development DATABASE_URL=postgres://localhost:5432/codelookup ts-node src/db/seed.ts

TypeError: stream.on is not a function
    at seed (/abs_path/codelookup/src/db/seed.ts:13:12)
    at Object.<anonymous> (/abs_path/codelookup/src/db/seed.ts:22:1)
    at Module._compile (node:internal/modules/cjs/loader:1246:14)
    at Module.m._compile (/abs_path/codelookup/node_modules/ts-node/src/index.ts:1597:23)
    at Module._extensions..js (node:internal/modules/cjs/loader:1300:10)
    at Object.require.extensions.<computed> [as .ts] (/abs_path/codelookup/node_modules/ts-node/src/index.ts:1600:12)
    at Module.load (node:internal/modules/cjs/loader:1103:32)
    at Function.Module._load (node:internal/modules/cjs/loader:942:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:83:12)
    at phase4 (/abs_path/codelookup/node_modules/ts-node/src/bin.ts:579:12)
$ npm list
├── @types/pg-copy-streams@1.2.2
├── @types/pg@8.6.5
├── pg-copy-streams@6.0.4
├── pg@8.7.3
├── ts-node@10.8.0

As you can see from the node-postgres.com/features/queries docs, client/pool.query is awaitable, which is why I presume that the return value is a promise.


side note: it would be great to have some docs with await documentation, to get started

jeromew commented 1 year ago

There is a big difference between your code and the example code on

var fs = require('fs')
var { Pool } = require('pg')
var copyFrom = require('pg-copy-streams').from

var pool = new Pool()

pool.connect(function (err, client, done) {
  var stream = client.query(copyFrom('COPY my_table FROM STDIN'))
  var fileStream = fs.createReadStream('some_file.tsv')
  fileStream.on('error', done)
  stream.on('error', done)
  stream.on('finish', done)
  fileStream.pipe(stream)
})

your are using pool.query (which does indeed seem to answer with a Promise) where the example code is using client.query. The problem is that pool.query does not handle the Submittable interface so it cannot be used with pg-copy-streams. You need to use pool.connect to get the client, and then call client.query.

Now regarding the use of await, I think you should be able to do something along the line of

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs')
const { Pool } = require('pg')

const pool = new Pool()
const client = await pool.connect()
const ingestStream = client.query(copyFrom('COPY my_table FROM STDIN'))
const sourceStream = fs.createReadStream('some_file.tsv')
await pipeline(sourceStream, ingestStream)
client.release()

could you refine on that example and send a PR with the documentation that you would have liked to find when you hit this road block ? Thanks

k-funk commented 1 year ago

excellent. misunderstood that difference about pool vs client, so I learned something new today. thank you!

Made a PR for the docs that I would've liked to see: https://github.com/brianc/node-pg-copy-streams/pull/145

jeromew commented 1 year ago

the pg Submittable interface is a corner case used by some modules like pg-copy-streams and pg-cursor but it does not have a lot of documentation and has its quirks.

Thanks for the PR I will look into it.

jeromew commented 1 year ago

Ok I did some minor modifications to the examples & published version 6.0.5 that has a better handling of ejs/cjs Named export compatibility (in order to be able to run the examples as .mjs files).