brianc / node-postgres

PostgreSQL client for node.js.
https://node-postgres.com
MIT License
12.23k stars 1.22k forks source link

async_hooks issues #2533

Open Badestrand opened 3 years ago

Badestrand commented 3 years ago

Hi!

Thanks for this great library!

Unfortunately I have the same problem as mentioned here: #2404

I.e. the query-finish callback is called in a different execution context. Or something. I find the async hooks confusing but I need them to track which sql queries were initiated by which request in my api.

I have something reproducible!

const pg = require('pg')
const {AsyncLocalStorage} = require('async_hooks')

const asyncLocalStorage = new AsyncLocalStorage()

function getExecId() {
    return asyncLocalStorage.getStore()
}

async function sleep(ms) {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve(), ms)
    })
}

const pool = new pg.Pool({
    host: 'localhost',
    port: 5432,
    user: '...',
    password: '...',
    database: '...',
})

function doThings(id) {
    asyncLocalStorage.run(id, async () => {
        // everything in here should always see the same value from `getExecId()`
        console.log('query starts in', getExecId())
        pool.query('SELECT 1', r => {
            console.log('query finished in', getExecId())
        })
        console.log('finish', getExecId())
    })
}

;(async () => {
    doThings(1)
    await sleep(1000)
    console.log('')
    doThings(2)
})()

Output is

query starts in 1
finish 1
query finished in 1

query starts in 2
finish 2
query finished in 1     <--- should be 2 as well

This shows that the callback for pool.query is in the wrong context.

I think this can be solved in the library by having some class inherit AsyncResource and then calling runInAsyncScope for that callback, see here for an example.

Badestrand commented 3 years ago

Ok not sure if this is 100% proper but seems to work:

const {AsyncLocalStorage, AsyncResource} = require('async_hooks')

...

class DBQuery extends AsyncResource {
    constructor() {
        super('DBQuery')
    }

    query(db, sql, values, callback) {
        db.query(sql, values, (err, r) => {
            this.runInAsyncScope(callback, null, err, r)
            this.emitDestroy()
        })
    }
}

function doThings(id) {
    asyncLocalStorage.run(id, async () => {
        console.log('query starts in', getExecId())
        new DBQuery().query(pool, 'SELECT 1', [], (err, r) => {
            console.log('query finished in', getExecId())
        })
        console.log('finish', getExecId())
    })
}

It would be great if this could be integrated into node-postgres so that it "just works" as a user of the library!

Badestrand commented 3 years ago

Okay, I have it down to this now:

// helper things
const {AsyncLocalStorage, AsyncResource} = require('async_hooks')
const asyncLocalStorage = new AsyncLocalStorage()

class AsRes extends AsyncResource {
    constructor() {
        super('AsRes')
    }
    exec(fn, args, callback) {
        fn(...args, (...result) => {
            this.runInAsyncScope(callback, null, ...result)
            this.emitDestroy()
        })
    }
}

function execInThisThreadStorage(fn, ...args) {
    const callback = args.pop()
    new AsRes().exec(fn, args, callback)
}

// the database things
const conn = new pg.Pool(...)

function query(...args) {
    execInThisThreadStorage(conn.query.bind(conn), ...args)
}

query('SELECT 1', [], (err, r) => {
    // now we are in the correct thread storage here
})
spencerwilson commented 3 years ago

I can't help debug, but in case it's helpful to the maintainer: Could you give which version of Node.js and Postgres you're using when reproducing?

Badestrand commented 3 years ago

Node v14.15 and pg 8.6.0

alxndrsn commented 3 years ago

@Badestrand I'm seeing this behaviour as well, but it only seems to happen when query() is used with callbacks - I've not been able to recreate using await query(...)...

I think this is demonstrable by modifying your original code like so:

        // everything in here should always see the same value from `getExecId()`
        console.log('query starts in', getExecId())
        const execId = await new Promise((resolve, reject) => {
            pool.query('SELECT 1', (err, res) => {
                resolve(getExecId())
            })
        });
        console.log('query finished in', execId)
        console.log('finish', getExecId())

this gives output:

query starts in 1
query finished in 1
finish 1

query starts in 2
query finished in 1   <--- it's unexpected, but we no longer care about this
finish 2

I do not understand how the context is lost inside the callback.

However, it does mean:

  1. if there's a way to avoid callbacks in your code then you can work around this bug/limitation, and
  2. that a fix is possible if node-pg is happy to rely on async/await. This may be more palatable than depending on AsyncResource, which only became stable in node 16.4 (https://nodejs.org/api/async_context.html#async_context_class_asyncresource), whereas (I think) async/await has been available since node 8.

Example async/await fix for query() in pg-pool/index.js:

original

  query(text, values, cb) {
    // guard clause against passing a function as the first parameter
    if (typeof text === 'function') {
      const response = promisify(this.Promise, text)
      setImmediate(function () {
        return response.callback(new Error('Passing a function as the first parameter to pool.query is not supported'))
      })
      return response.result
    }

    // allow plain text query without values
    if (typeof values === 'function') {
      cb = values
      values = undefined
    }
    const response = promisify(this.Promise, cb)
    cb = response.callback

    this.connect((err, client) => {
      if (err) {
        return cb(err)
      }

      let clientReleased = false
      const onError = (err) => {
        if (clientReleased) {
          return
        }
        clientReleased = true
        client.release(err)
        cb(err)
      }

      client.once('error', onError)
      this.log('dispatching query')
      client.query(text, values, (err, res) => {
        this.log('query dispatched')
        client.removeListener('error', onError)
        if (clientReleased) {
          return
        }
        clientReleased = true
        client.release(err)
        if (err) {
          return cb(err)
        } else {
          return cb(undefined, res)
        }
      })
    })
    return response.result
  }

fixed

  async query(text, values, cb) {
    // guard clause against passing a function as the first parameter
    if (typeof text === 'function') {
      const response = promisify(this.Promise, text)
      setImmediate(function () {
        return response.callback(new Error('Passing a function as the first parameter to pool.query is not supported'))
      })
      return response.result
    }

    // allow plain text query without values
    if (typeof values === 'function') {
      cb = values
      values = undefined
    }
    const response = promisify(this.Promise, cb)
    cb = response.callback

    try {
      const result = await new Promise((resolve, reject) => {
        this.connect((err, client) => {
          if (err) {
            return reject(err)
          }

          let clientReleased = false
          const onError = (err) => {
            if (clientReleased) {
              return
            }
            clientReleased = true
            client.release(err)
            reject(err)
          }

          client.once('error', onError)
          this.log('dispatching query')
          client.query(text, values, (err, res) => {
            this.log('query dispatched')
            client.removeListener('error', onError)
            if (clientReleased) {
              return
            }
            clientReleased = true
            client.release(err)
            if (err) {
              return reject(err)
            } else {
              return resolve(res)
            }
          })
        })
      })
      cb(undefined, result)
    } catch(err) {
      cb(err)
    }
    return response.result
  }

I have no doubt there's a more elegant fix than this :slightly_smiling_face:

alxndrsn commented 3 years ago

@Badestrand here's a fix which should make sure all your callbacks execute in the correct context.

For me, this makes my integration tests marginally slower.

// Make sure that DB query callbacks are executed in the correct async context
// see: https://github.com/brianc/node-postgres/issues/2533
const { connect } = pool;

wrapQueryFn(pool);

pool.connect = async cb => {
  if(cb) {
    try {
      const client = await connect.call(pool);
      wrapQueryFn(client);
      return cb(undefined, client, client.release);
    } catch(err) {
      return cb(err);
    }
  } else {
    return connect.call(pool);
  }
};

function wrapQueryFn(db) {
  const { query } = db;

  if(query._async_context_protection) {
    return;
  }

  db.query = async (text, values, cb) => {
    if (typeof values === 'function') {
      cb = values;
      values = undefined;
    }

    if(cb) {
      try {
        return cb(undefined, await query.call(db, text, values));
      } catch(err) {
        return cb(err);
      }
    } else {
      return query.call(db, text, values);
    }
  };

  db.query._async_context_protection = true;
}
knilink commented 2 years ago

one workaround is

import { promisify, callbackify } from 'node:util';
callbackify(promisify(pool.query)).call(pool, ...);

as mentioned in the doc "Context loss" sesstion:

If your code is callback-based, it is enough to promisify it with util.promisify() so it starts working with native promises.

so promisify seems to be able to recover the context.

here is an pure js example to demonstrate the issue.

import { promisify, callbackify } from 'node:util';
import { AsyncLocalStorage } from 'node:async_hooks';

const asyncLocalStorage = new AsyncLocalStorage();

class MyQueue {
  _queue = [];
  next() {
    setTimeout(() => {
      const cb = this._queue.shift();
      cb();
      if (this._queue.length) {
        this.next();
      }
    });
  }
  exec(cb) {
    this._queue.push(cb);
    if (this._queue.length === 1) {
      this.next();
    }
  }
  patchedExec = callbackify(promisify(this.exec));
}

const myQueue = new MyQueue();

function main() {
  for (let i = 0; i < 5; i++) {
    asyncLocalStorage.run(i, () => {
      myQueue.exec(() => {
        console.log(['exec', i, asyncLocalStorage.getStore()]);
      });
    });
  }
  for (let i = 0; i < 5; i++) {
    asyncLocalStorage.run(i, () => {
      myQueue.patchedExec(() => {
        console.log(['patchedExec', i, asyncLocalStorage.getStore()]);
      });
    });
  }
}

main();

for pg the callbacks were binded to the process which triggered client.connect()