grantcarthew / node-rethinkdb-job-queue

A persistent job or task queue backed by RethinkDB.
https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki
MIT License
157 stars 16 forks source link

Anyway to make delayed jobs instant? #71

Closed FrancosLab closed 7 years ago

FrancosLab commented 7 years ago

Reading through the documentation and maybe head towards the path where I use this library, mainly because I really want to use RethinkDB as my database.

I was wondering however is there a way to make delayed jobs instant?

My use case is this.

3 different queues of repetitive actions that will span 24 hours (each with different data, at different intervals)

Job 1 - Delay 0 (instant - start time) Job 2 - Delay 30s (relative to start time) Job 3 - Delay 60s (relative to start time) Job 4 - Delay 90s (relative to start time) Job 5 -....

These jobs have to sequential but delayed. From my understanding with the Master Queue, it will trigger every 5 minutes to excute delayed jobs, but wouldnt that mean that the jobs will queue up within those 5 minutes and all of them execute one after another? That's not what im looking for, I want them processed as soon as the delay time is up.

Any insight would be awesome! Thank you.

grantcarthew commented 7 years ago

Hi @FrancosLab,

This queue is not really designed to be a high performance queue.

There are three triggers that will cause a worker node to start processing:

  1. The worker finishes processing its maximum concurrency jobs.
  2. A job is added to the queue.
  3. A master review process is completed.

The only option above to support your sub second timing is when the worker finishes working on a job.

Even when a job is added to the queue there is a random delay inserted into the restart processing call with this function:

function restartProcessing (q) {
  logger('restartProcessing')
  setTimeout(function randomRestart () {
    queueProcess.restart(q)
  }, Math.floor(Math.random() * 1000))
}

If you don't need the job processing to be distributed, why are you using more than one job? If you do need them distributed, accurate sub second times will be hard to get.

FrancosLab commented 7 years ago

@grantcarthew , Thank you for all of that. I accidentally messed up the time units, not looking to do milliseconds, but seconds instead, so 0 seconds, 30 seconds, 60 seconds..., Although, not sure if that changes anything you would recommend though.

Regarding the restart processing call, thats the random delay before the first job gets fired?

Anyways thank you for the info again! will continue to read through the documentation.

grantcarthew commented 7 years ago

Ah, well that does make a big difference @FrancosLab

The next question is; do you need distributed processing?

If not, then I would suggest using an in memory queue (find any on NPM) and persisting the results to RethinkDB yourself. Not hard to do.

If you are after distributed processing and you want the processing done in a time sensitive fashion, you are going to have to poll the queue often. You could set the Queue.masterInterval to a low number. This will cause a lot of extra work for your database. Please note that this is only needed if you have a queue with only a few limited jobs in it. If the queue has hundreds of jobs being processed all the time then the jobs will be picked up as soon as they are ready.

By the way, the random delay is to prevent many worker queues from hitting the database as the same time.

grantcarthew commented 7 years ago

Oh, you could manually call Queue.review using your own timeout routine.

grantcarthew commented 7 years ago

I just checked out the code because you got me thinking about manually initiating a worker. You can do it with the Queue.resume(false) which runs the below module. This would be better than using Queue.review because it only initiates processing rather than perform the other review tasks.


module.exports.resume = function interruptionResume (q, source) {
  logger(`resume`, source)
  q._paused = false
  const makeGlobal = is.true(source)
  const eventGlobal = makeGlobal || source === enums.state.global
  return q.ready().then(() => {
    if (makeGlobal) {
      return queueState(q, enums.status.active)
    }
  }).then(() => {
    queueProcess.restart(q)
    logger(`Event: resumed`, q.id, eventGlobal)
    q.emit(enums.status.resumed, q.id, eventGlobal)
    return true
  })
}

The false parameter will only cause the local worker node to start processing jobs.

grantcarthew commented 7 years ago

@FrancosLab Any progress?

grantcarthew commented 7 years ago

Closing this now. Feel free to reopen it if you have further questions.