brianc / node-postgres

PostgreSQL client for node.js.
https://node-postgres.com
MIT License
12.06k stars 1.21k forks source link

pg-query-stream error event not emitted in 3.x when postgres server killed. Works in 2.x #2187

Open markddrake opened 4 years ago

markddrake commented 4 years ago

Apologies in advance if this is an expected change of behavior or a total misunderstanding of how things are meant to work...

Given the following code

const {Client,Pool} = require('pg')
const  QueryStream = require('pg-query-stream')
const Transform = require('stream').Transform;

class MyTransform extends Transform {

  constructor() {
    super({objectMode: true });  
    this.counter = 0;
  }

  async _transform (data,encodoing,done) {
    this.counter++
    if ((this.counter % 10000) === 0 ){
      this.push('.')
    }
    done();
  }
}

async function main() {

 const pool = new Pool({
   user: 'postgres',
   host: 'yadamu-db2',
   database: 'yadamu',
    password: 'oracle',
    port: 5432
  })

 const connection  = await pool.connect()
 connection.on('error',
      function(err, p) {
        // yadamuLogger.info([`${databaseVendor}`,`Connection.onError()`],err.message);
        // Do not throw errors here.. Node will terminate immediately
        // const pgErr = new PostgresError(err,self.postgresStack,self.postgressOperation)  
        // throw pgErr
      }
    )

 console.log('Connected')
 const queytStream = new QueryStream(`select jsonb_build_array("PROD_ID","CUST_ID","TIME_ID"::timestamptz,"CHANNEL_ID","PROMO_ID","QUANTITY_SOLD","AMOUNT_SOLD") "json" from "SH"."SALES"`)
 const resultStream = await connection.query(queytStream)
 const t = new MyTransform()

 console.log('Creating Promise')
 const copy = new Promise(async function foo(resolve,reject) {   
   resultStream.on('error',function(e) {console.log(`Error emitted after ${t.counter} rows.`),reject(e)})
   resultStream.on('end',function() { resolve('DONE')})
   try {
     resultStream.pipe(t).pipe(process.stdout);
   } catch(e) {
     reject(e);
   }
 })

 console.log('Waiting for Pipe')
 await copy
 console.log('Pipe Complete');

}
main().then(function(r) { console.log(r)}).catch(function(e) {console.log(e)});

If I run the code with Node 14.1 and

+-- pg@8.0.3 +-- pg-copy-streams@2.2.2 +-- pg-query-stream@2.1.2

and kill the backend postgres process while the query is running I get the following output

C:\Development\YADAMU-2020-04-15\app>node t1.js
Connected
Creating Promise
Waiting for Pipe
..............................................................Error emitted after 623500 rows.
Error: Connection terminated unexpectedly
    at Connection.<anonymous> (C:\Development\YADAMU-2020-04-15\app\node_modules\pg\lib\client.js:275:71)
    at Object.onceWrapper (events.js:421:28)
    at Connection.emit (events.js:315:20)
    at Socket.<anonymous> (C:\Development\YADAMU-2020-04-15\app\node_modules\pg\lib\connection.js:126:10)
    at Socket.emit (events.js:327:22)
    at endReadableNT (_stream_readable.js:1223:12)
    at processTicksAndRejections (internal/process/task_queues.js:84:21)

C:\Development\YADAMU-2020-04-15\app>

As can be seen the error event is emitted and my error handler is invoked. However if I upgrade to pg_query_stream 3.x and run the same code I get the following

C:\Development\YADAMU-2020-04-15\app>npm install pg-query-stream@3.0.7
+ pg-query-stream@3.0.7
updated 1 package and audited 830 packages in 2.294s

8 packages are looking for funding
  run `npm fund` for details

found 3 low severity vulnerabilities
  run `npm audit fix` to fix them, or `npm audit` for details

C:\Development\YADAMU-2020-04-15\app>node t1.js
Connected
Creating Promise
Waiting for Pipe
..................................................
C:\Development\YADAMU-2020-04-15\app>

As can seen the 'error' event does not appear to be emitted when I kill the backend process.

Was the 2.x behavior incorrect ?. If so, how should I be setting up the onError handler in 3.x

markddrake commented 4 years ago

It appears that with 2.x two connection errors are emitted, the first before the streams error event is emitted, and the second after the stream error event. With 3.x only the second of the connection errors is emitted.

brianc commented 4 years ago

Hey @markddrake sorry you're running into that issue. Thanks for the report w/ the code example. Do you happen to have a piece of code that can reproduce the entire thing w/o having to manually kill a backend process? Its not really possible to turn these things into unit tests (and subsequently fix them) if the only way to get the issue to trigger is a manual step in the middle.

markddrake commented 4 years ago

Brian

I don't, wish that I did, since I have a similar issue with testing YADAMU (except x8 since I support 8 different databases). I am more of an Oracle and to a lesser extend MsSQL guy.

I've been thinking along the lines of having a second pool and connection and trying to kill the process from that connection. Note sure if that approach could work with Postgres, in general issuing a graceful shutdown on a second connection results in a cleaner disconnection than killing a process. I tried with a docker stop but that seems to get translated into a graceful shutdown. The other thought is to have some kind of kill server running that you could send the kill process request too. I

markddrake commented 4 years ago

OK Try this..

const {Client,Pool} = require('pg')
const  QueryStream = require('pg-query-stream')
const Transform = require('stream').Transform;

class MyTransform extends Transform {

  constructor() {
    super({objectMode: true });  
    this.counter = 0;
  }

  async _transform (data,encodoing,done) {
    this.counter++
    if ((this.counter % 10000) === 0 ){
      this.push('.')
    }
    done();
  }
}

async function main() {

 const pool = new Pool({
   user: 'postgres',
   host: 'yadamu-db2',
   database: 'yadamu',
    password: 'oracle',
    port: 5432
  })

 const connection  = await pool.connect()
 connection.on('error',
      function(err, p) {
        // yadamuLogger.info([`${databaseVendor}`,`Connection.onError()`],err.message);
        // Do not throw errors here.. Node will terminate immediately
        // const pgErr = new PostgresError(err,self.postgresStack,self.postgressOperation)  
        // throw pgErr
      }
    )

 const res = await connection.query(`select pg_backend_pid()`)
 const pid = res.rows[0].pg_backend_pid;
 console.log(pid)
 console.log('Connected')

 const conn2 = await pool.connect();
 setTimeout(
    async function() {
     const res = await conn2.query(`select pg_terminate_backend(${pid})`);
     await conn2.release()
    },2000);

 const queytStream = new QueryStream(`select jsonb_build_array("PROD_ID","CUST_ID","TIME_ID"::timestamptz,"CHANNEL_ID","PROMO_ID","QUANTITY_SOLD","AMOUNT_SOLD") "json" from "SH"."SALES"`)
 const resultStream = await connection.query(queytStream)
 const t = new MyTransform()

 console.log('Creating Promise')
 const copy = new Promise(async function foo(resolve,reject) {   
   resultStream.on('error',function(e) {console.log(`Error emitted after ${t.counter} rows.`),reject(e)})
   resultStream.on('end',function() { resolve('DONE')})
   try {
     resultStream.pipe(t).pipe(process.stdout);
   } catch(e) {
     reject(e);
   }
 })

 console.log('Waiting for Pipe')
 await copy
 console.log('Pipe Complete');
 await connection.release()
 await pool.end()

}
main().then(function(r) { console.log(r)}).catch(function(e) {console.log(e)});
markddrake commented 4 years ago

Running with 3.0.7

C:\Development\YADAMU>node scratch\postgres\killBackend.js
11666
Connected
Creating Promise
Waiting for Pipe
..............
C:\Development\YADAMU>

Running with 2.1.2

C:\Development\YADAMU>node scratch\postgres\killBackend.js
11674
Connected
Creating Promise
Waiting for Pipe
.............Error emitted after 133100 rows.
error: terminating connection due to administrator command
    at Connection.parseE (C:\Development\YADAMU\node_modules\pg\lib\connection.js:581:48)
    at Connection.parseMessage (C:\Development\YADAMU\node_modules\pg\lib\connection.js:380:19)
    at Socket.<anonymous> (C:\Development\YADAMU\node_modules\pg\lib\connection.js:116:22)
    at Socket.emit (events.js:315:20)
    at addChunk (_stream_readable.js:302:12)
    at readableAddChunk (_stream_readable.js:278:9)
    at Socket.Readable.push (_stream_readable.js:217:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:186:23) {
  length: 116,
  severity: 'FATAL',
  code: '57P01',
  detail: undefined,
  hint: undefined,
  position: undefined,
  internalPosition: undefined,
  internalQuery: undefined,
  where: undefined,
  schema: undefined,
  table: undefined,
  column: undefined,
  dataType: undefined,
  constraint: undefined,
  file: 'postgres.c',
  line: '3011',
  routine: 'ProcessInterrupts'
}

So I guess I'm now spending the rest of my day automating kill tests.. :)

Envek commented 3 years ago

Faced very similar issue today while testing ”what if something breaks while streaming results” (and terminating postgres backend is the simplest way to emulate such a failure).

With code like this (somewhere in Express.js route handlers):

const { Client } = require('pg');
const QueryStream = require('pg-query-stream');

// Using demo database from https://postgrespro.com/education/demodb
const client = new Client({ connectionString: 'postgresql:///demo' });
await client.connect();
const qs = new QueryStream('SELECT * FROM bookings;');
const stream = client.query(qs);
stream.on('data', console.log);

And terminating postgres backend somewhere in-between:

SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query = 'SELECT * FROM bookings;';

My node process terminates with following error:

events.js:292
      throw er; // Unhandled 'error' event
      ^

Error: Connection terminated unexpectedly
    at Connection.<anonymous> (/home/envek/…/node_modules/pg/lib/client.js:272:71)

Workaround: manually forward error to stream

client.once('error', (...args) => stream.emit('error', ...args));

Now it doesn't crash and I can subscribe to error message on stream.

Not sure whether this is expected behavior or not.

Versions: node: 12.18.3 pg: 8.3.0 pg-query-stream: 3.2.0:

brianc commented 3 years ago

Hey nice - are you able to build a test case for this in the code? I think the stream should throw an error if the client has an error...this is a bug.

brianc commented 3 years ago

actually I think I can fix this right now...gimme a min

brianc commented 3 years ago

okay i ran out of time for the day 😦 Its' tricky because the client is dead, so you want to handle the death of the client on the client, but the query stream should reject as well.

Envek commented 3 years ago

client is dead, so you want to handle the death of the client on the client

:exploding_head:

markddrake commented 3 years ago

Uh, yes. couldn't agree more. However the point is the code I have that "handles the death of the client on the client" that works with 2.1.2 does not work with 3.1.7 because the stream does not get the error...

lastmjs commented 3 years ago

I'm having trouble catching errors in streams as well

markddrake commented 3 years ago

Just retried with 4.x and the issue still exists..

markddrake commented 3 years ago

I just had to upgrade to avoid other issues that had started to surface with @2.1.2. I am now using the following code to work around the fact the pg-query-stream@4.0.0 does not raise an error when the connection is lost.

const handleConnectionError = (err) => {inputStream.destroy(err); inputStream.emit('error',err)}
this.connection.on('error',handleConnectionError)
inputStream.on('end',() => { this.connection.removeListener('end',handleConnectionError)}).on('error',() => { this.connection.removeListener('error',handleConnectionError)})           

Two issues, first, I still don't think I should need to do this.

Second, I don't think I should have to force the input stream to emit the error, calling destroy with the error should cause the error to be emitted.