timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
2.15k stars 160 forks source link

feature request: worker's job filter #409

Open Eomm opened 1 year ago

Eomm commented 1 year ago

In order to create a QOS system it would be great to support a filter option by the work() method.

This new parameter lets the user to write to a single queue/table and 2 different workers will be able to process the same queue at different pace:

❗️ It is up to the user writing a filter that process all the jobs, otherwise the job with taste: lemon will be archived automatically.

Example:

const PgBoss = require('pg-boss');

(async function () {
  try {
    await buildConsumer();
  } catch (error) {
    console.log({ globalErr: error.message });
  }
})();

async function buildConsumer () {
  const boss = new PgBoss({
    user: 'postgres',
    password: 'postgres',
    noScheduling: true,
  });

  await boss.work(
    queueName,
    {
      teamSize: 2,
      newJobCheckInterval: 1000,
      filter: { // 🚀
        jobFilter: `data ->> 'body' = $1`,
        jobParams: ['body'],
      }
    },
    executeJob
  );

  console.log('Waiting for jobs');
}

This should generate a query like this in the fetch function:

https://github.com/timgit/pg-boss/blob/523c36ba7bf285456c2358b710f13d8d77d34b3a/src/plans.js#L353

    WITH nextJob as (
      SELECT id
      FROM pgboss.job j
      WHERE state < 'active'
        AND name LIKE $1
        AND startAfter < now()
+        AND data ->> 'body' = $3
      ORDER BY priority desc, createdOn, id
      LIMIT $2
      FOR UPDATE SKIP LOCKED
    )
    UPDATE pgboss.job j SET
      state = 'active',
      startedOn = now(),
      retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
    FROM nextJob
    WHERE j.id = nextJob.id
    RETURNING j.id, name, data, EXTRACT(epoch FROM expireIn) as expire_in_seconds

Note:

What do you think?

timgit commented 1 year ago

You can already do this using wildcards. Set all workers to a wildcard by default, such as icecream.*, then, you can replace the wildcard with a specific flavor

Eomm commented 1 year ago

Sorry, I don't get it. Does the producer that send the message must know the consumer's queue in di case?

My target would be having a silly producer that does not know how many consumers the BE has

timgit commented 1 year ago

Queue patterns use the character to match 0 or more characters. For example, a job from queue status-report-12345 would be fetched with pattern status-report- or even stat*5.

For example, a producer would use the flavor as part of the queue name, such as icecream.vanilla and icecream.chocolate. A consumer using work('icecream.*') would get both flavors, but another consumer using work('icecream.vanilla') would not get chocolate.

Eomm commented 1 year ago

The proposed solution assumes that I have control over the producer - it is not the case 😞

Eomm commented 1 year ago

Would you mind to accept a PR with such a feature in case?

timgit commented 1 year ago

Yes, sounds good

nickreese commented 1 year ago

This would be useful. I didn't see a test case for work('icecream.*.toppings.none') sort of queue layouts.

timgit commented 3 months ago

Since wildcards have been dropped in v10, my original response is no longer valid. The most feasible option seems to be some sort of additional opt-in filter like originally proposed. The primary issue in using the data payload is fetch performance, since the data column is not indexed. I don't think indexing data is a good idea globally, since the payload could be large, and this would impact write performance as well.

However, this doesn't mean it's a deal-breaker. Now that queues are isolated via partitioning, this opens up the possibility of a diversity of indexing strategies since indexes in queue A wouldn't have to match indexes in queue B. One way to pull this off would be add a new option in createQueue().