nathanschwarz / meteor-cluster

worker pool for meteor using node js native `cluster` module
MIT License
18 stars 2 forks source link
cluster jobqueue meteor multi-core worker worker-pool worker-threads

meteor-cluster

Meteor Package enabling users to create a Worker Pool on the server to handle heavy jobs.
It can run synchronous and asynchronous tasks from a persitent / in-memory queue.
It can also run recurring and scheduled tasks.

TaskQueue

TaskQueue is both a Mongodb and an in-memory backed job queue.
It enables to add, update, remove jobs consistently between processes.

You can attach event listeners to handle the tasks results / errors

prototype

TaskQueue.addTask({ taskType: String, data: Object, priority: Integer, _id: String, dueDate: Date, inMemory: Boolean })

Event listeners (Master only) :

TaskQueue.addEventListener(eventType: String, callback: function)

In-Memory Queue (Master only) :

TaskQueue.inMemory.findById(_id: String)

TaskQueue.inMemory.removeById(_id: String)

TaskQueue.inMemory.tasks() : returns all in-memory tasks

TaskQueue.inMemory.availableTasks() : returns available in-memory tasks

note on the in-memory / persistent task queue

Both in-memory and persistent tasks are available at the same time, and can be used altogether but :

Cluster

Cluster is an isomorphic class to handle both the Worker and the Master

on the Master it :

prototype

constructor(taskMap: Object, { port: Integer, maxAvailableWorkers: Integer, refreshRate: Integer, inMemoryOnly: Boolean, messageBroker: function, logs: String, keepAlive: String | Integer, autoInitialize: Boolean })

IPC (advanced usage)

Introduced in version 2.0.0, you can communicate between the child processes and the Master. To do so, you must provide the Master Cluster instance with a messageBroker function. this function will handle (on the master) all custom messages from the child processes.

the function should be prototype as follow :
messageBroker(respond: function, msg: { status: Int > 1, data: Any })

All communications between the master and a child must be started by the child. To do so you can use the second parameter passed in all functions provided to the taskMap toggleIPC which is prototyped as follow :

toggleIPC(messageBroker: function, initalize: function): Promise

because toggleIPC returns a promise you must return it (recursively), otherwise the job will be considered done, and the worker Idle.
Not returning it will result in unwanted, non expected behavior.

CPUS allocation

You should not use the default maxAvailableWorkers (cpus allocation number) value. The default value is set to your system cpus number, but it's a reference value. It's up to you to understand your needs and allocate cpus accordingly.

how can I calculate the maximum number of cpus I can allocate ?

for example, if you're running on a 8 core machine :

so you should have maxAvailableWorkers = Cluster.maxWorkers() - 4 === 4

what if I allocated too much CPUS ?

You can't allocate more than your maximum system cpu number.
You still can outrange the theoretical maximum process number :

in such case your overall system should be slowed down because some of the processes execution will be deferred. It will drastically reduce the multi-core performance gain.

examples

basic usage

  import { Meteor } from 'meteor/meteor'
  import { Cluster, TaskQueue } from 'meteor/nschwarz:cluster'

  const taskMap = {
    'TEST': job => console.log(`testing ${job._id} at position ${job.data.position}`),
    'SYNC': (job) => console.log("this is a synchronous task"),
    'ASYNC': (job) => new Promise((resolve, reject) => Meteor.setTimeout(() => {
      console.log("this is an asynchronous task")
      resolve()
    }, job.data.timeout))
  }

  function onJobsDone({ value, task }) {
    console.log('do something with the result')
  }

  function onJobsError({ value, task }) {
    console.log('do something with the error')
  }

  function syncTask() {
    return TaskQueue.addTask({ taskType: 'SYNC', data: {}})
  }

  function asyncTask() {
    return TaskQueue.addTask({ taskType: 'ASYNC', data: { timeout: 5000 }, priority: 6 })
  }

  function inMemoryTask(priority, position) {
   return TaskQueue.addTask({ taskType: 'TEST', priority, data: { position }, inMemory: true })
  }

  function persistentTask(priority, position) {
   return TaskQueue.addTask({ taskType: 'TEST', priority, data: { position }, inMemory: false })
  }

  const cluster = new Cluster(taskMap)
  Meteor.startup(() => {
    if (Cluster.isMaster()) {
      TaskQueue.addEventListener('done', onJobsDone)
      TaskQueue.addEventListener('error', onJobsError)

      syncTask()
      asyncTask()
      inMemoryTask(8, 1)
      inMemoryTask(1, 2)

      persistentTask(8, 1)
      persistentTask(1, 2)
    }
  })

scheduled task example : run a task in ten minutes

  import { add } from 'date-fns/date' // external library to handle date objects

  const dueDate = add(new Date(), { minutes: 10 })
  TaskQueue.addTask({ taskType: 'sometype', priority: 1, data: {}, dueDate })

scheduled task example : run a recurring task every ten minutes

  import { add } from 'date-fns/date' // external library to handle date objects

  function recurringTask(job) {
    // do something
    const dueDate = add(new Date(), { minutes: 10 })
    TaskQueue.addTask({ taskType: 'recurringTask', priority: 1, data: {}, dueDate })
  }

  const taskMap = {
    recurringTask
  }

simple IPC example (advanced usage)

function ipcPingTest(job, toggleIPC) {
  return toggleIPC(
    (msg) => {
      console.log(msg)
      return 'result you eventually want to pass to the master'
    }, (smtm) => smtm({ status: 4, data: 'ping' })
  )
}

const taskMap = {
  ipcPingTest
}

function messageBroker(respond, msg) {
  if (msg.data === 'ping') {
    respond('pong')
  }
}

const cluster = new Cluster(taskMap, { messageBroker })

multiple IPC example (advanced usage)

function ipcPingTest(job, toggleIPC) {
  return toggleIPC(
    (msg) => {
      console.log(msg)
      return toggleIPC(
        (msg) => console.log(msg),
        (smtm) => smtm({ status: 4, data: 'ping' })
      )
    }, (smtm) => smtm({ status: 4, data: 'ping' }))
}

const taskMap = {
  ipcPingTest
}

function messageBroker(respond, msg) {
  if (msg.data === 'ping') {
    respond('pong')
  }
}

const cluster = new Cluster(taskMap, { messageBroker })

common mistakes and good practices

secure your imports

Because the worker will only work on tasks, you should remove the unnecessary imports to avoid resources consumption and longer startup time.
As a good practice you should put all your Master imports logic in the same file, and import it only on the master.
What I mean by "Master imports Logic" is :

It could be summarized as such :

// in your entry file

if (Cluster.isMaster()) {
  import './MasterImports.js'
}
// ...rest of your cluster logic

recurring tasks

Because recurring tasks are created "recursively", there will always be a task in the queue.
If the server is restarted, it will start the recurring task because it's still in the queue.
Be sure to remove all recurring task on the master before starting others, or secure the insert.
Otherwise you will have multiple identical recurring tasks running at the same time.

You can either do :

Meteor.startup(() => {
  if (Cluster.isMaster()) {
    TaskQueue.remove({ taskType: 'recurringTask' })
  }  
})

or at task initialization :

  const recurringTaskExists = TaskQueue.findOne({ taskType: 'recurringTask' }) !== undefined
  if (!recurringTaskExists) {
    TaskQueue.addtask({ taskType: 'recurringTask', priority: 1, data: {}, dueDate })
  }

task uniqueness

If you want to be sure to have unique tasks, you should set a unique Id with TaskQueue.addTask.
A good model could be : ${taskType}${associated_Model_ID}

multiple apps

There's no way right now to know from which app the task is started (may change later) :
you should only run the Cluster on one of the app to avoid other apps to run a task which is not included in its taskMap.
You can still use the TaskQueue in all the apps of course.
If your apps have different domain names / configurations (for the mailer for example), you should pass these through the data field.

For example if you're using Meteor.absoluteUrl or such in a task it will have the value associated with the app running the Cluster.