timgit / pg-boss

Queueing jobs in Postgres from Node.js like a boss
MIT License
2.15k stars 160 forks source link

Stately queues with different `singletonSeconds` argument #507

Closed aslushnikov closed 1 month ago

aslushnikov commented 1 month ago

Hi Tim,

Consider we have a queue that counts objects in an S3 bucket:

const pgBoss = new PgBoss({ /* connect to pg */});
await pgBoss.start();

await pgBoss.createQueue('count-objects', {
  name: 'count-objects',
  policy: 'stately',
});

await pgBoss.work('count-objects', async ([job]) => {
  /* count objects and write data to the db */
});

Now, once someone signals to me that a bunch of new objects have been added to the bucket, I can schedule a new job, using singletonMinutes: 1 to defend against too-many-signals:

await pgBoss.send('count-objects', { bucket: 's3-bucket' }, {
  singletonMinutes: 1, // make sure we don't count objects too often
  singletonKey: 's3-bucket-1',
  singletonNextSlot: true,
});

So far so good!

Now, however, I'd like to occasionally recount objects once the number is read: this is to make sure that if someone forgot to signal me, the data will be "eventually-consistent".

I tried doing so by scheduling another job with singletonMinutes: 10 to not stress system too much under multiple reads:

await pgBoss.send('count-objects', { bucket: 's3-bucket' }, {
  singletonMinutes: 10, // make sure we don't count objects too often
  singletonKey: 's3-bucket-1',
  singletonNextSlot: true,
});

This doesn't seem to work: sometimes some of the jobs don't schedule. Is this supposed to work? I read the documentation multiple times, but I didn't see the definition for this kind of behavior.

Thanks, Andrey

timgit commented 1 month ago

The goal of the stately policy is to restrict concurrency as much as possible. As you add conditions to jobs, such as singleton key or throttling, you are by definition making them less restrictive. This is the opposite of how time-based throttling works, where the default behavior is to allow as many jobs to exist as possible, and adding singleton keys and debouncing makes them more restrictive. I went back and forth on the design decisions around these, which resulted in queue policies.

Here's the relevant unique constraints to make this more clear.

CREATE UNIQUE INDEX job_i3 ON ${schema}.job (name, state, COALESCE(singleton_key, '')) WHERE state <= '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.stately}'

CREATE UNIQUE INDEX job_i4 ON ${schema}.job (name, singleton_on, COALESCE(singleton_key, '')) WHERE state <> '${JOB_STATES.cancelled}' AND singleton_on IS NOT NULL`

As you can see, these constraints overlap, where stately will make sure you can't have more than 1 job in created, no matter what you set with singletonMinutes. A standard queue would allow you to continually create them every 10 minutes, even if the jobs aren't being processed, resulting in a backlog.

aslushnikov commented 1 month ago

The goal of the stately policy is to restrict concurrency as much as possible. As you add conditions to jobs, such as singleton key or throttling, you are by definition making them less restrictive. This is the opposite of how time-based throttling works, where the default behavior is to allow as many jobs to exist as possible, and adding singleton keys and debouncing makes them more restrictive. I went back and forth on the design decisions around these, which resulted in queue policies.

Ok, this was actually eye-opening to me. I took some time to experiment to make sure I understand now how things work. To recap, here are my take-aways:


First and foremost: the semantics of the send method options depend heavily on the policy of the queue we send to.

Queues with "regular" policy

  1. All the singleton* options are affecting only the submission of jobs; workers pull jobs with max concurrency that is specified by batchSize + amount of workers.
  2. Using singletonKey without any of the singletonSeconds, singletonMinutes or singletonHours is ignored

Queues with "stately" policy

  1. The singletonMinutes, singletonSeconds, singletonHours and singletonNextSlot options make no sense and are basically ignored.
  2. Having no singletonKey in the "send" part is the same as providing an empty key
  3. For each singletonKey, there might be max 2 jobs in the queue: 1 in active state and 1 in queued state.

Thank you for the explanation!