brianc / node-postgres

PostgreSQL client for node.js.
https://node-postgres.com
MIT License
12.21k stars 1.22k forks source link

Using async/await with pg-cursor? #1839

Open zachsa opened 5 years ago

zachsa commented 5 years ago

Hi - thanks for a great library! I have a question regarding the pg-cursor API.

In the example on using a cursor from start to finish, this is shown in the docs HERE:

const { Pool } = require('pg')
const Cursor = require('pg-cursor')

const pool = new Pool()
const client = await pool.connect()
const cursor = client.query(new Cursor('select * from generate_series(0, 5)'))
cursor.read(100, (err, rows) => {
  if (err) {
    throw err;
  }
  assert(rows.length == 6)
  cursor.read(100, (err, res) => {
    assert(rows.length == 0)
  })
})

It seems like I would need to wrap this in a promise if I wanted to be able to cycle through rows returned via the cursor?

Is it possible to use async/await here? I.e. something along the lines of the following:

const { Pool } = require('pg')
const Cursor = require('pg-cursor')

const pool = new Pool()
const client = await pool.connect()
const cursor = client.query(new Cursor('select * from generate_series(0, 5)'))
let rows = await cursor.read(100)
while (rows.length) {
  // do something with rows
  rows = await cursor.read(100)
}

Apologies if I've missed something in the documentation!

charmander commented 5 years ago

You should be able to promisify pg-cursor manually:

const { promisify } = require('util')
const Cursor = require('pg-cursor')

Cursor.prototype.readAsync = promisify(Cursor.prototype.read)

⋮

let rows = await cursor.readAsync(100)
while (rows.length) {
  // do something with rows
  rows = await cursor.readAsync(100)
}

But that seems like it would make a good built-in pg-cursor feature.

zachsa commented 5 years ago

Hi @charmander. I agree that this would be nice to include in the pg library. I'd quite like to contribute on this - would that be okay?

I've used the Mongo Node.js client before and if I remember correctly they support either callback or promise style dynamically. I was pointed to code HERE that achieves this (the executeOperation function in case the line numbers change). Would this be a suitable approach for this library?

Do you have any pointers/preferences for a good way to incorporate promise support into the the cursor.read function? If not, and if this is something that you wouldn't mind me working on, I could look through the code and come up with suggestions.

charmander commented 5 years ago

@zachsa See https://github.com/brianc/node-pg-cursor/blob/1cdad4d8d20000dc6fb9a783394249b727106b84/index.js#L170-L185 and https://github.com/brianc/node-postgres/blob/2c7be86104e6f4e3ad5f2992b80a54e11a8edff3/lib/client.js#L265-L280.

zachsa commented 5 years ago

Thank you. That actually looks more straightforward than I was expecting.

zachsa commented 5 years ago

Hi @charmander - I had a quick look at this over the weekend and will hopefully have a bit of time this month to look more into this (I would really like to contribute to this if I am able).

Previously I have used this library without paying too much attention to how it works. Looking at pg-cursor's package.json file I see that it is dependent on "pg": "6.x". However pg is at 7.x.

In a node app, my package.json app looks like this:

"pg": "^7.4.3",
"pg-cursor": "^2.0.0",

Does this mean that at app start I'm loading 2 separate instantiations of the pg module (of different versions)? I.e. I'm requiring pg 7.4.3, and also requiring pg-cursor. Does pg-cursor require the pg library separately as the module is loaded?

If so, why?

I am using it like this:

...
const client = await pool.connect()
const cursor = client.query(new Cursor(sql))

So to me it seems that the cursor wouldn't need to include a dependency on the pg library itself?

charmander commented 5 years ago

@zachsa pg-cursor has "pg": "6.x" in its devDependencies, which it uses for testing. It should be tested on the latest pg, but it doesn’t cause pg 6.x to be installed for you.

zachsa commented 5 years ago

Ah thanks. I should have picked that up

zachsa commented 5 years ago

Hi @charmander, would it be welcome if I were to update the package.json dependency versions? For example, using the npm-check-updates package, these packages are out of date:

 eslint                   ^4.4.0  →   ^6.3.0
 eslint-config-standard  ^10.2.1  →  ^14.1.0
 eslint-plugin-import     ^2.7.0  →  ^2.18.2
 eslint-plugin-node       ^5.1.1  →  ^10.0.0
 eslint-plugin-promise    ^3.5.0  →   ^4.2.1
 eslint-plugin-standard   ^3.0.1  →   ^4.0.1
 mocha                    ^3.5.0  →   ^6.2.0
 pg                          6.x  →      7.x

Updating mocha didn't break anything. Updating the eslint requires some simple renaming of functions (for example, assert.equal => assert.strictEqual). But one test is now failing (https://github.com/brianc/node-pg-cursor/blob/master/test/query-config.js#L20-L34):

  it('passes types to result', (done) => {
    const client = new pg.Client()
    client.connect()
    const text = 'SELECT generate_series as num FROM generate_series(0, 2)'
    const types = {
      getTypeParser: () => () => 'foo'
    }
    const cursor = client.query(new Cursor(text, null, { types }))
    cursor.read(10, (err, rows) => {
      assert(!err)
      assert.deepStrictEqual(rows, [{ num: 'foo' }, { num: 'foo' }, { num: 'foo' }]) // changed from assert.deepEqual
      client.end()
      done()
    })
  })

The test output is this:

  1) query config passed to result
       passes types to result:

      Uncaught AssertionError [ERR_ASSERTION]: Expected values to be strictly deep-equal:
+ actual - expected

  [
+   anonymous {
-   {
      num: 'foo'
    },
+   anonymous {
-   {
      num: 'foo'
    },
+   anonymous {
-   {
      num: 'foo'
    }
  ]
      + expected - actual

I'm not sure what the expected behaviour should be? I can see that the test suite isn't expecting the constructor name for each object in the row.

zachsa commented 5 years ago

What is actually happening here?

const types = {
  getTypeParser: () => () => 'foo'
}
const cursor = client.query(new Cursor(text, null, { types }))

I see from the documentation that a cursor is an instance of Submittable. But I'm not sure that that means in the context of passing the types object to the cursor constructor:

zachsa commented 5 years ago

Also - is this the same issue (as in, promisifying the cursor) as discussed here? https://github.com/brianc/node-pg-cursor/issues/36

zachsa commented 5 years ago

Another question: The Cursor.prototype.read function - https://github.com/brianc/node-pg-cursor/blob/master/index.js#L170.

I assume this function is what is called in this scenario?

const client = await pool.connect()
const clientQueryResult = client.query(new Cursor(sql))
clientQueryResult.read(...)
felixfbecker commented 5 years ago

It would be amazing if Cursor could implement Symbol.asyncIterator so that it can be iterated with for await (const row of cursor) {}. This should be backwards compatible because old versions would never call it

tienzochi commented 3 years ago

@charmander Great solution! Although, I tried your solution above but it doesn't seem to work with calling the client again inside the loop when the size of rows is smaller than the actual size. So for example I have 10 users in the users_table and I tried getting two at a time to double check if it reads the next bit, it doesn't seem to go along with another client call inside. Do you know the explanation for it? Here's a sample code of what I am trying to achieve:

const { Pool } = require("pg");
const Cursor = require("pg-cursor");
const { promisify } = require("util");

Cursor.prototype.readAsync = promisify(Cursor.prototype.read);

(async function () {
  const pool = new Pool({
    user: POSTGRES_USER,
    host: POSTGRES_HOST,
    port: POSTGRES_PORT,
    database: POSTGRES_DATABASE,
    password: POSTGRES_PASSWORD,
  });
  const client = await pool.connect();
  const cursor = client.query(
    new Cursor("select user_id, username from user_table")
  );

  let rows = await cursor.readAsync(2);

  while (rows.length) {
    for (let row of rows) {
      const rowrow = await client.query(
        `SELECT * FROM sales 
            WHERE user_id = '${row.user_id}'
            LIMIT 1`
      );

      // code below is unreachable
      console.log(rowrow);
    }

    rows = await cursor.readAsync(2);
  }

  cursor.close(() => {
    client.end();
  });
})();
charmander commented 3 years ago

@tienzochi You can’t make new queries on a client while one is already in progress (the one with the cursor). pool.query instead should work. (That example code should be written without queries in a loop at all, though – something like this, for example:

SELECT DISTINCT ON (user_id) user_id, username, sales.[…]
  FROM user_table LEFT JOIN sales USING (user_id)
  -- ORDER BY probably; if not, maybe EXISTS or equivalent join would be more appropriate
brianc commented 3 years ago

@felixfbecker

It would be amazing if Cursor could implement Symbol.asyncIterator so that it can be iterated with for await (const row of cursor) {}.

For this you should probably use pg-query-stream which already supports this. pg-cursor is used by pg-query-stream. I'm definitely cool with promisifying the cursor, but really the cursor is a bit lower level than pg-query-stream and can be useful primarily when you want to read exactly a certain number of rows at a time in chunks to carefully control how you're consuming. pg-query-stream is probably more generally useful for "pipe this query to this other thing" but due to high water marks, back pressure, and internal buffering in node streams you don't have as fine grained control over the exact number of rows read. In practice this is usually fine, and kinda preferred as you usually don't need to think about things at that level...so would recommend pg-query-stream for most use cases, including async iteration.

disfated commented 3 years ago

I'm using this guy to sugarize cursors

const pg = require('pg');
const Cursor = require('pg-cursor');

const client = new pg.Client(/* ... */);

client.cursor = async function* cursor(query, values, chunkSize = 100) {
    const cursor = this.query(new Cursor(query, values));
    try {
        while (true) {
            const rows = await new Promise((resolve, reject) => {
                cursor.read(chunkSize, (error, rows) => error ? reject(error) : resolve(rows));
            });
            if (rows.length === 0) break;
            for (const row of rows) {
                yield row;
            }
        }
    } finally {
        cursor.close();
    }
};

// usage

try {
    const rows = client.cursor(`SELECT ...`, [], 20);
    for await (const row of rows) {
        use(row);
    }
} catch (error) {
    handle(error); // should perfectly catch all errors here
}
sw360cab commented 3 years ago

@brianc It is not clear to me that if using an async iterator with pg-query-stream with a huge table, will basically give me the same result (memory saving), of trying promisifying the pg-cursor?

mariusa commented 2 years ago

I'm definitely cool with promisifying the cursor

@brianc could this be included, please? It would enable the example given by @charmander , out of the box, and close this issue.

Cursor.prototype.readAsync = promisify(Cursor.prototype.read) // should be built-in

let rows = await cursor.readAsync(100)
while (rows.length) {
  // do something with rows
  rows = await cursor.readAsync(100)
}
eMuonTau commented 2 years ago

cursor.read() already returns Promise if no callback supplied. Am i missing something? https://github.com/brianc/node-postgres/blob/master/packages/pg-cursor/index.js#L231

pg: 8.7.3 pg-cursor: 2.7.3

mariusa commented 2 years ago

Looks so. I'll be able to test in 2 weeks. Thanks

mariusa commented 2 years ago

cursor.read() already returns Promise if no callback supplied.

This is correct, works fine. This issue can be closed.