Automattic / kue

Kue is a priority job queue backed by redis, built for node.js.
http://automattic.github.io/kue
MIT License
9.46k stars 867 forks source link

Kue job concurrency not being honored #1011

Open TheRobBrennan opened 7 years ago

TheRobBrennan commented 7 years ago

Hi there. I'm running into a weird issue where my worker processes are not honoring concurrency limits. My understanding is that the default behavior of Kue - when a concurrency value is not supplied - is to run a given job from start to success before handling the next job of that type in the queue.

// worker.js
...
const queue = require('./lib/queue')  // Reference to my library file for Kue functionality
...
// ./lib/queue.js
...
function _createJobWithNameAndOpts (name, opts, cb) {
  if (!name) { return cb() }
  if (!opts) { opts = {} }

  console.log('Creating new job \'' + name + '\' with opts ' + JSON.stringify(opts))

  var job = jobs.create(name, opts)
  var execute = require('./' + name).process

  job
    .on('enqueue', function () {
      var message = 'QUEUED: Job ' + job.id + ' added to queue (' + dateFormatter.getFormattedTimestampForConsoleOutput() + ')'
      if (VERBOSE) { console.log(message) }
    })
    .on('start', function () {
      var message = 'START: Job ' + job.id + ' has started (' + dateFormatter.getFormattedTimestampForConsoleOutput() + ')'
      if (VERBOSE) { console.log(message) }
    })
    .on('complete', function () {
      var message = 'SUCCESS: Job ' + job.id + ' completed (' + dateFormatter.getFormattedTimestampForConsoleOutput() + ')'
      if (VERBOSE) { console.log(message) }
    })
    .on('failed', function () {
      var message = 'FAILURE: Job ' + job.id + ' did not complete (' + dateFormatter.getFormattedTimestampForConsoleOutput() + ')'
      if (VERBOSE) { console.log(message) }
    })

  jobs.process(name, 1, execute)    // The same as jobs.process(name, execute)

  job.save()
  return cb()
}

module.exports.createJobWithNameAndOpts = _createJobWithNameAndOpts

In one example file - where I pass in name as invoice - the ./invoice.js exported process method is one that simply simulates a process that takes five seconds to complete:

...
module.exports.process = function (job, done) {
  var message = 'PROCESSING: ' + TASK_NAME + ' ' + job.id + ' is ready to be processed (' + dateFormatter.getFormattedTimestampForConsoleOutput() + ')'
  console.log(message)
  setTimeout(function () {
    return done()
  }, 5000)
}
...

From what I understand, I should see logging output where one process starts and completes successfully before a second process kicks off. However, here is what happens when I queue up five (5) processes:

13:03:14 web.1    | QUEUED: Job 2 added to queue (2017-01-04 13:03:14:896)
13:03:14 web.1    | QUEUED: Job 3 added to queue (2017-01-04 13:03:14:896)
13:03:14 web.1    | QUEUED: Job 4 added to queue (2017-01-04 13:03:14:896)
13:03:14 web.1    | QUEUED: Job 5 added to queue (2017-01-04 13:03:14:896)
13:03:14 web.1    | QUEUED: Job 6 added to queue (2017-01-04 13:03:14:896)
13:03:14 web.1    | PROCESSING: invoice 2 is ready to be processed (2017-01-04 13:03:14:958)
13:03:14 web.1    | PROCESSING: invoice 3 is ready to be processed (2017-01-04 13:03:14:958)
13:03:14 web.1    | PROCESSING: invoice 4 is ready to be processed (2017-01-04 13:03:14:959)
13:03:14 web.1    | PROCESSING: invoice 5 is ready to be processed (2017-01-04 13:03:14:959)
13:03:14 web.1    | PROCESSING: invoice 6 is ready to be processed (2017-01-04 13:03:14:959)
13:03:15 web.1    | START: Job 2 has started (2017-01-04 13:03:15:018)
13:03:15 web.1    | START: Job 3 has started (2017-01-04 13:03:15:018)
13:03:15 web.1    | START: Job 4 has started (2017-01-04 13:03:15:019)
13:03:15 web.1    | START: Job 5 has started (2017-01-04 13:03:15:019)
13:03:15 web.1    | START: Job 6 has started (2017-01-04 13:03:15:019)
13:03:20 web.1    | SUCCESS: Job 2 completed (2017-01-04 13:03:20:142)
13:03:20 web.1    | SUCCESS: Job 3 completed (2017-01-04 13:03:20:142)
13:03:20 web.1    | SUCCESS: Job 4 completed (2017-01-04 13:03:20:142)
13:03:20 web.1    | SUCCESS: Job 5 completed (2017-01-04 13:03:20:142)
13:03:20 web.1    | SUCCESS: Job 6 completed (2017-01-04 13:03:20:142)

What could be the issue here? Shouldn't I see a SUCCESS: Job 2 ... message before job 3 is processed/started? Each invoice job will take five seconds to run, and if it's only supposed to run them one at a time, why is it starting them at once? I was able to confirm this by running 50 jobs against this, and sure enough all 50 start at the same time. I wouldn't want to have 50 connections attempting to be opened to the database cluster - the goal is to have them fire sequentially =)

behrad commented 7 years ago

Are you sure _createJobWithNameAndOpts is called once? You may be calling .process multiple times...

TheRobBrennan commented 7 years ago

I wonder if maybe the design/implementation wasn't quite what I hoped it would be? Basically I have that main queue library which handles creating the Kue queue instance and then exports that to be shared by two separate node processes - process A and process B.

From what I could see in testing, if I created the job within the queue library and set queue.process(job, max_concurrency, function), it wouldn't be respected in the exported queue that process A and process B were referring to. However, if I waited to set the concurrency limit in process A or process B, it would work fine.

I then realized I missed a remark in the documentation where each process contains a singleton reference to a queue, but it doesn't appear queue concurrency limits are respected unless they are explicitly defined by the process.

Is there a better way to handle a queue that is shared among multiple processes?

tobalsgithub commented 7 years ago

Hey @TheRobBrennan, not sure if this helps you or not. We needed to ensure FIFO with multiple processes processing the same queue. We created https://github.com/Automattic/kue/pull/896 to handle that, and have been using it in production for over a year with no problems.

ghost commented 7 years ago

@behrad where should the .process() be if i want to fire it by users request? i tried putting it inside user.post() route with no luck, and you mentioned that it shouldn't be called more then once so how?

having the same issue of job events spaghetti...

marcuspoehls commented 7 years ago

@TheRobBrennan Hi Rob, did you solve your issue of processing concurrency not being respected for jobs?

I’m having the same issues where a queue is created at Node.js server start. Then I initialize all jobs to the queue with Queue.process(name, 1, fn). Within my application I’m using the reference to the queue created at startup to create new jobs which then get processed with the defined function (within Queue.process).

TheRobBrennan commented 7 years ago

@marcuspoehls Ehhhh...kinda. I'm not overly pleased with it, but here is the 30,000 ft view. Process A and process B all use the same kue queue Q. While both processes could be running separately, each process needed to be aware of any special handling that was required.

The solution I put into place has essentially a shared library that both processes refer to - and if there is a job of type j1 that requires special handling, both process A and process B needed to have logic in there to know how to handle that.

Not ideal, but it seemed to get the intended job done.