grantcarthew / node-rethinkdb-job-queue

A persistent job or task queue backed by RethinkDB.
https://github.com/grantcarthew/node-rethinkdb-job-queue/wiki
MIT License
157 stars 16 forks source link

[Enhancement] Broadcast job queues #73

Closed disarticulate closed 6 years ago

disarticulate commented 6 years ago

I've searched through the docs, and it doesn't appear that I could broadcast a job to multiple processors. It seems like that should be a thing, as most queues can be one-to-one or one-many.

TomKaltz commented 6 years ago

@disarticulate A job is broadcast to all job processors but only one will actually process the job.

This module is meant to be job queue and not necessarily a message queue. I know nsq has the notion of "channels" to be able to distribute the same message to multiple consumers but a job queue wouldn't really benefit from this sort of setup.

If you really wanted the same job to be processed by multiple processors you would have to create a queue for each processor and submit the same job separately to each of those queues.

Am I understanding your issue correctly? If not, please provide more detail as to the workflow you're trying to achieve.

grantcarthew commented 6 years ago

Thanks @TomKaltz I have used this package before @disarticulate however I had trouble with it: https://github.com/yamalight/rethinkdb-pubsub

You could also try this: https://github.com/whyhankee/dbwrkr

The other option is to just register a change feed against a table and build it yourself.

I did think about adding a generic message bus to the job queue. It's not off the cards yet however I don't have time to commit to new features.

disarticulate commented 6 years ago

Yes. I'd like a broadcast flag to have all processors process the same job. I've hotfixed it by just creating a watch feed that then submits a completed change to the local handler. But in general, it's useful to have a one->many relationship for some queues.

const processChangefeed = require('rethinkdb-changefeed-reconnect')
const queueChangeFeedWatcher = (name, q, queueHandler, vm) => {
  q.on('ready', (queueId) => {
    var r = q.r
    var newTasksFeed = () => {
      return r.table(name)
        .changes()
    }

    var handleNewTask = ({new_val, old_val}) => {
      if ('data' in new_val
        && new_val.status === 'completed'
        && new_val.data.broadcast
        && new_val.data.broadcasters.length > 0) {
        var [host,proc,cmd,count,uuid] = new_val.queueId.split(':')

        if (new_val.data.broadcasters.indexOf(hostID()) === -1
            && queueHandler[cmd]) {
          vm.log(new_val.data.broadcasters, host, proc, cmd)
          queueHandler[cmd](
            new_val,
            (err,result) => {},
            (job,cb) => {}
          )
        }
      }
    }

    var handleError = (err) => {
      vm.log(err.stack)
    }
    processChangefeed(newTasksFeed, handleNewTask, handleError, {
      changefeedName: name,
      attemptDelay: 30000,
      maxAttempts: 30,
      silent: false,
      logger: console
    })
  })
}
grantcarthew commented 6 years ago

OK, @disarticulate. If this was going to be a new feature how would you implement it? I don't fully understand so if you could expand on the implementation details please.

disarticulate commented 6 years ago

based on what I hotfixed:

//create a queue
const Queue = require('rethinkdb-job-queue')

const cxnOptions = {
  host: 'proddb.domain.com',
  port: 10000,
  db: 'JobQueue'
}

const qOptions = {
  name: 'VideoProcess',
  masterInterval: 1000000,
  changeFeed: true,
  concurrency: 50,
  removeFinishedJobs: false,
  broadcast: true,
  broadcasters: [] //it might be more intuitive to track who is responsible for the boardcast, or the inverse
}

const q = new Queue(cxnOptions, qOptions)

every process would generate a log where completed:

{ 
      date: 2016-08-17T03:44:30.634Z,
      message: 'Job processed by queue worker',
      queueId: 'WebDev:rjqJobQueueTests:rjqJobQueueTestJobs:7615:baad3b28-80c9-48ba-a800-73a2ae3a89a2',
      retryCount: 0,
      status: 'processed',
      type: 'information' 
}

Every processed log has an Event attached to what I have is a processorId (${hostname}:${uuidv4})

q.on('processed', (queueId, jobId, processorId) => {
  console.log('Job was processed by: ' + processorId)
})

Master would be in charge of marking status as completed based on processor count at time ofthe number of processes available at the time of job creation.

What I did in the hotfix is simply do this in my process handler:

job.broadcasters.push(processorId)
job.update()

Which allows me to use a standard changeFeed for the broadcaster flag, and only process if the processorId is not in the broadcasters array, and update broadcasters once processed.

grantcarthew commented 6 years ago

OK, I understand what you are trying to do now @disarticulate thanks.

One of the things that goes through my mind whilst reading this is the job statuses. A job will only be retrieved from the queue if it has a status of waiting or failed. Once a job is retrieved its status gets set to active. Once it is active it will not appear to other workers due to the index used in the filter. How would you handle this?

Also, how many processing nodes will you have on a specific job? How can the Queue Master be in charge of assigning the job a completed status if it does not know how many processing nodes there are? This would need to be assigned to the job in some way. What happens if a processing node fails (power or some other issue)? If you state the job has 15 nodes processing it and 14 are functioning properly the job will be stuck in what state?

What about job processing failures and retries?

I still feel like this is trying to retrofit a standard FIFO job queue into something it is not designed to do. The queue-process.js file is quite complex and purposely designed for one task. Managing the job state outside of the queue based on parallel processing will be bypassing this functionality.

I'm not killing the idea and not trying to dissuade you, just don't see it fitting in well with the current project. If you can convince me otherwise....

Perhaps this is a good idea for your own project? A new rethinkdb-job-cluster package? Possibly a new package using rethinkdb-job-queue as a dependency?

Please note that your input is very much appreciated.

disarticulate commented 6 years ago

I understand the difficulties if you've streamlined the queue to do single worker.

As I indicated, I hotfixed my requirements by simply listening for the first job to complete and any listener on that queue will run based on a data.broadcast flag and a broadcasters array.

Maybe you could consider a plugin architecture that would make the above hotfix easier.

You can close this issue as needed, my requirements don't need to worry about many of the things this library does. I'll probably continue to just add hotfixes unless there's way to add a middleware or plugin.

Great work, regardless.

grantcarthew commented 6 years ago

If you want to send a PR @disarticulate we can work through the details.

I'm planning on adding my goals for this project into the README however I haven't done that yet. One of my goals is to keep it as simple as possible. If these feature can be added without too much complexity being added then I would be happy to merge.

grantcarthew commented 6 years ago

Ping @disarticulate

disarticulate commented 6 years ago

unfortunately i am in the process of developing and wont be able to contribute at the moment. the code i posted above runs the functionality i need.

On Jul 31, 2017 19:41, "Grant Carthew" notifications@github.com wrote:

Ping @disarticulate https://github.com/disarticulate

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/grantcarthew/node-rethinkdb-job-queue/issues/73#issuecomment-319235487, or mute the thread https://github.com/notifications/unsubscribe-auth/AQp7PV5GTgMiK4GwH37MyQkioaRDNmCzks5sTnRGgaJpZM4OgQL1 .

grantcarthew commented 6 years ago

OK. I'm going to close this @disarticulate although if you want to put some time and effort into a PR I would be happy to work with you. Thanks.