chilts / mongodb-queue

Message queues which uses MongoDB.
209 stars 92 forks source link

Option to reset tries in `ping()` #40

Open alecgibson opened 3 years ago

alecgibson commented 3 years ago

The ping() method can be useful for setting up recurring jobs if we deliberately avoid acking the job. For example:

  1. Submit job
  2. Pull job from queue
  3. Process job
  4. ping()
  5. Go to Step 2

A real-world example of this might be notifying for recurring appointments, or setting up long-running, cross-process, periodic jobs.

The main advantage this has over using ack() and add() is that it effectively requeues a job in a single, atomic commit. If we tried the above with ack() and add():

  1. Submit job
  2. Pull job from queue
  3. Process job
  4. ack()
  5. add()
  6. Go to Step 2

In this version, the process could crash or quit between Steps 4 & 5, and our recurring job would be lost.

We could also try inverting Steps 4 & 5, but then we get the opposite issue: if the process crashes or quits, then we might accidentally duplicate our recurring job. It also prevents us from setting up any unique indexes on our payload.

Using ping() perfectly solves this problem: there's only ever one version of the job, and it's never dropped (because it's never acked). If the process crashes before we ping(), we'll retry it, as with any other normal job.

The one issue with this approach is that tries will steadily increase, and - if you have maxRetries set up - the job will eventually be moved to the dead queue, which isn't what we want.

This change adds an option to the ping() method: resetTries, which will reset tries to zero, so that the job is treated like a "new" job when it's pinged, and is only moved to the dead queue if it's genuinely retried.

nataliataylor commented 2 years ago

The ping() method can be useful for setting up recurring jobs if we deliberately avoid acking the job. For example:

  1. Submit job
  2. Pull job from queue
  3. Process job
  4. ping()
  5. Go to Step 2

A real-world example of this might be notifying for recurring appointments, or setting up long-running, cross-process, periodic jobs.

The main advantage this has over using ack() and add() is that it effectively requeues a job in a single, atomic commit. If we tried the above with ack() and add():

  1. Submit job
  2. Pull job from queue
  3. Process job
  4. ack()
  5. add()
  6. Go to Step 2

In this version, the process could crash or quit between Steps 4 & 5, and our recurring job would be lost.

We could also try inverting Steps 4 & 5, but then we get the opposite issue: if the process crashes or quits, then we might accidentally duplicate our recurring job. It also prevents us from setting up any unique indexes on our payload.

Using ping() perfectly solves this problem: there's only ever one version of the job, and it's never dropped (because it's never acked). If the process crashes before we ping(), we'll retry it, as with any other normal job.

The one issue with this approach is that tries will steadily increase, and - if you have maxRetries set up - the job will eventually be moved to the dead queue, which isn't what we want.

This change adds an option to the ping() method: resetTries, which will reset tries to zero, so that the job is treated like a "new" job when it's pinged, and is only moved to the dead queue if it's genuinely retried.

Why don't just have a new method resetTries with an option {delayed: x}?