timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
2.04k stars 157 forks source link

Using `executeSql` not working in 4.0 #154

Closed green3g closed 4 years ago

green3g commented 4 years ago

Hi - I'm running into an issue with upgrading to 4.0 and using the executeSql parameter in PgBoss.

First, the reason I'm doing this is to avoid creating multiple DB connections, and reusing an existing knex connection:

const dbAdaptor = {
            executeSql: (sql, parameters = []) => {
                try {
                    // This is needed to replace pg-boss' $1, $2 arguments
                    // into knex's :val, :val2 style.
                    const replacedSql = sql.replace(/\$(\d+)\b/g, (_, number) => `:param_${number}`);

                    const parametersObject = {};
                    parameters.forEach(
                        (value, index) => (parametersObject[`param_${index + 1}`] = value),
                    );

                    return this.db.Model.raw(replacedSql, parametersObject);
                } catch (err) {
                    return logger('Error querying database:', err.message);
                }
            },
        };
        //initialize a boss
        this.boss = new PgBoss({
            db: dbAdaptor,
        });
        this.bossPromise = this.boss.start(); // never resolves due to error

The errors I'm seeing now:

On initialization:

error: Unhandled Rejection at: Promise  {"name":"error","length":127,"severity":"ERROR","code":"22007","file":"datetime.c","line":"3795","routine":"DateTimeParseError"}
error: error: invalid input syntax for type timestamp with time zone: "120"
    at Connection.parseE (/mnt/c/Users/groemhildt/projects/wsb-services/node_modules/pg/lib/connection.js:606:11)
    at Connection.parseMessage (/mnt/c/Users/groemhildt/projects/wsb-services/node_modules/pg/lib/connection.js:403:19)
    at Socket.<anonymous> (/mnt/c/Users/groemhildt/projects/wsb-services/node_modules/pg/lib/connection.js:123:22)
    at Socket.emit (events.js:315:20)
    at Socket.EventEmitter.emit (domain.js:485:12)
    at addChunk (_stream_readable.js:297:12)
    at readableAddChunk (_stream_readable.js:273:9)
    at Socket.Readable.push (_stream_readable.js:214:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:186:23)
error: error: invalid input syntax for type timestamp with time zone: "120"
    at Connection.parseE (/mnt/c/Users/groemhildt/projects/wsb-services/node_modules/pg/lib/connection.js:606:11)
    at Connection.parseMessage (/mnt/c/Users/groemhildt/projects/wsb-services/node_modules/pg/lib/connection.js:403:19)
    at Socket.<anonymous> (/mnt/c/Users/groemhildt/projects/wsb-services/node_modules/pg/lib/connection.js:123:22)
    at Socket.emit (events.js:315:20)
    at Socket.EventEmitter.emit (domain.js:485:12)
    at addChunk (_stream_readable.js:297:12)
    at readableAddChunk (_stream_readable.js:273:9)
    at Socket.Readable.push (_stream_readable.js:214:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:186:23)
(node:11841) PromiseRejectionHandledWarning: Promise rejection was handled asynchronously (rejection id: 1)

And probably more relevant:

\n    WITH j AS (SELECT\n      $1::uuid as id,\n      $2::text as name,\n      $3::int as priority,\n      'created'::pgboss.job_state as state,\n      $4::int as retryLimit,\n      CASE\n        WHEN right($5, 1) = 'Z' THEN CAST($6 as timestamp with time zone)\n        ELSE now() + CAST(COALESCE($7,'0') as interval)\n        END as startAfter,\n      CAST($8 as interval) as expireIn,\n      $9::jsonb as data,\n      $10::text as singletonKey,\n      CASE\n        WHEN $11::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * ($12 * floor((date_part('epoch', now()) + $13) / $14))\n        ELSE NULL\n        END as singletonOn,\n      $15::int as retryDelay,\n      $16::bool as retryBackoff\n    )\n    INSERT INTO pgboss.job (\n      id,\n      name,\n      priority,\n      state,\n      retryLimit,\n      startAfter,\n      expireIn,\n      data,\n      singletonKey,\n      singletonOn,\n      retryDelay,\n      retryBackoff,\n      keepUntil\n      )\n    SELECT\n      id,\n      name,\n      priority,\n      state,\n      retryLimit,\n      startAfter,\n      expireIn,\n      data,\n      singletonKey,\n      singletonOn,\n      retryDelay,\n      retryBackoff,\n      CASE\n        WHEN right($17, 1) = 'Z' THEN CAST($18 as timestamp with time zone)\n        ELSE startAfter + CAST(COALESCE($19,'0') as interval)\n        END as keepUntil\n    FROM j\n    ON CONFLICT DO NOTHING\n    RETURNING id\n   - invalid input syntax for type timestamp with time zone: \"120\"

when inserting a new job.

green3g commented 4 years ago

Update: It looks like the 120 from above is coming from maintenanceIntervalSeconds.

timgit commented 4 years ago

Can you post your init options and/or publish() calls?

timgit commented 4 years ago

Looks like you might have a bug in your parameter enumeration, where you loop and increment the placeholder value. For exampe, your statement includes values all the way up to 19, but the original statement only has 12. This is probably because I reuse these values multiple times in the SQL. Here's the original statement: https://github.com/timgit/pg-boss/blob/master/src/plans.js#L322-L380

green3g commented 4 years ago

Hmm - not sure if that's it. . I've been looking at it and it would appear that the values would get reused. I think its trying to put the value of 120 (maintenanceIntervalSeconds) in as the timezone.

Here's the SQL that is replaced:

WITH j AS (SELECT
      :param_1::uuid as id,
      :param_2::text as name,
      :param_3::int as priority,
      'created'::pgboss.job_state as state,
      :param_4::int as retryLimit,
      CASE
        WHEN right(:param_5, 1) = 'Z' THEN CAST(:param_5 as timestamp with time zone)
        ELSE now() + CAST(COALESCE(:param_5,'0') as interval)
        END as startAfter,
      CAST(:param_6 as interval) as expireIn,
      :param_7::jsonb as data,
      :param_8::text as singletonKey,
      CASE
        WHEN :param_9::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * (:param_9 * floor((date_part('epoch', now()) + :param_10) / :param_9))
        ELSE NULL
        END as singletonOn,
      :param_11::int as retryDelay,
      :param_12::bool as retryBackoff
    )
    INSERT INTO pgboss.job (
      id,
      name,
      priority,
      state,
      retryLimit,
      startAfter,
      expireIn,
      data,
      singletonKey,
      singletonOn,
      retryDelay,
      retryBackoff,
      keepUntil
      )
    SELECT
      id,
      name,
      priority,
      state,
      retryLimit,
      startAfter,
      expireIn,
      data,
      singletonKey,
      singletonOn,
      retryDelay,
      retryBackoff,
      CASE
        WHEN right(:param_13, 1) = 'Z' THEN CAST(:param_13 as timestamp with time zone)
        ELSE startAfter + CAST(COALESCE(:param_13,'0') as interval)
        END as keepUntil
    FROM j
    ON CONFLICT DO NOTHING
    RETURNING id

here's the parametersObject and the input array getting passed:

image

In response to your other question - I'm not doing anything special with the start options. Publish isn't getting called since this.boss.start() fails with the error. error: invalid input syntax for type timestamp with time zone: "120" (I updated my code above to be show the start call.)

green3g commented 4 years ago

FYI I'm using PostgreSQL 11.7 (Debian 11.7-2.pgdg90+1)

green3g commented 4 years ago

I must be missing something obvious. I tried testing the code you have in the plans sql and I don't understand why it isn't working.

image

SELECT (
    CASE WHEN right('120', 1)='Z' THEN CAST('120' as timestamp with time zone) 
    ELSE now() + CAST(COALESCE('120','0') as interval)
    END
) as startAfter

right('120', 1)='Z' evaluates to 0 so <> 'Z' but somehow its still evaluating to throw that error...

green3g commented 4 years ago

Okay - so I think I might have found the root cause. Knex doesn't appear to use prepared statements at all so everything is being evaluated by the compiler as "static" for lack of better wordage.

https://github.com/knex/knex/issues/718

So I think that's why those values are throwing errors, the compiler knows 120 isn't a timestamp and catches it early.

timgit commented 4 years ago

what does your toString() output from knex.raw?

for example, this should show the actual values being substituted, like the following statement.

knex.raw('select 1 = ?', [1]).toString()

result:

select 1 = 1
green3g commented 4 years ago

Here's the raw tostring value:

WITH j AS (SELECT
      '1d340e50-6937-11ea-b8e9-6f358183b88c'::uuid as id,
      '__pgboss__maintenance'::text as name,
      0::int as priority,
      'created'::pgboss.job_state as state,
      0::int as retryLimit,
      CASE
        WHEN right('120', 1) = 'Z' THEN CAST('120' as timestamp with time zone)
        ELSE now() + CAST(COALESCE('120','0') as interval)
        END as startAfter,
      CAST('15 minutes' as interval) as expireIn,
      NULL::jsonb as data,
      NULL::text as singletonKey,
      CASE
        WHEN NULL::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * (NULL * floor((date_part('epoch', now()) + 0) / NULL))
        ELSE NULL
        END as singletonOn,
      0::int as retryDelay,
      false::bool as retryBackoff
    )
    INSERT INTO pgboss.job (
      id,
      name,
      priority,
      state,
      retryLimit,
      startAfter,
      expireIn,
      data,
      singletonKey,
      singletonOn,
      retryDelay,
      retryBackoff,
      keepUntil
      )
    SELECT
      id,
      name,
      priority,
      state,
      retryLimit,
      startAfter,
      expireIn,
      data,
      singletonKey,
      singletonOn,
      retryDelay,
      retryBackoff,
      CASE
        WHEN right('30 days', 1) = 'Z' THEN CAST('30 days' as timestamp with time zone)
        ELSE startAfter + CAST(COALESCE('30 days','0') as interval)
        END as keepUntil
    FROM j
    ON CONFLICT DO NOTHING
    RETURNING id
timgit commented 4 years ago

I rewrote the SQL to hopefully bypass knex's arg pre-parsing and substitutions. I can't say this experience with knex was a positive one, lol. Give this version a try please.

npm i pg-boss@4.0.1-beta1
green3g commented 4 years ago

Yep - looks like that fixed it!

Thanks! I'd agree - I was not aware that knex wasn't capable of using prepared statements.

KazimirPodolski commented 7 months ago

Didn't work well for knex... https://nvd.nist.gov/vuln/detail/CVE-2016-20018