nodeca / idoit

Redis-backed task queue engine with advanced task control and eventual consistency
MIT License
75 stars 9 forks source link

idoit

CI NPM version Coverage Status

Redis-backed task queue engine with advanced task control and eventual consistency.

Features in details

idoit provides advanced control to implement so

Grouping. Special group task execute children tasks and wait until all complete. Useful for map/reduce logic.

Chaining. Special chain task execute children one-by-one. Also useful for map-reduce or splitting very complicated tasks to more simple steps.

Mapping iterator. Special feature for huge payloads, to produce chunks on demand. Benefits:

Progress. When you use groups/chain/map scenarios, it's easy to monitor total progress via top parent. Long standalone tasks also can notify user about progress change.

Worker pools. You can split tasks by different processes. For example, if you don't wish heavy tasks to block light ones.

Sheduler. Built-in cron allows to execute tasks on given schedule.

Data consistency

Install

node.js 6+ and redis 3.0+ required.

npm install idoit --save

API

new Queue({ redisURL, concurrency = 100, name = 'default', ns = 'idoit:' })

It's a good practice to have separate worker pools for heavy blocking tasks and non-blocking ones. For example, nobody should block sending urgent emails. So, create several worker processes, pin those to different pools and set proper tasks concurrency. Non-blocking tasks can be cunsumed in parallel, and you can be ok with default concurrency = 100. Blocking tasks should be consumed one-by-one, set concurrency = 1 for those workers.

Note. It may happen, that you remove some task types from your app. In this case orphaned data will be wiped after 3 days.

.registerTask(options), .registerTask(name [, cron], process)

Options:

.getTask(id)

Get task by id. Returns a Promise resolved with task or with null if task not exist.

Task fields you can use:

.cancel(id)

Cancel task. Returns a Promise resolved with task.

Note. You can cancel only tasks without parent.

.start()

Start worker and begin task data consume. Return Promise, resolved when queue is ready (call .ready() inside).

If pool was specified in cunstructor, only tasks routed to this pull will be consumed.

.shutdown()

Stop accepting new tasks from queue. Return Promise, resolved when all active tasks in this worker complete.

.ready()

Return Promise, resolved when queue is ready to operate (after 'connect' event, see below).

.options(opts)

Update constructor options, except redisURL.

.on('eventName', handler)

idoit is an EventEmitter instance that fires some events:

.\<taskName>(...params)

Create new Task with optional params.

task.options({ ... })

Override task properties. For example, you may wish to assign specific group/chain tasks to another pool.

task.run()

Run task immediately. Returns a Promise resolved with task id.

task.postpone([delay])

Postpone task execution to delay milliseconds (or to task.postponeDelay).

Returns a Promise resolved with task id.

task.restart([add_retry] [, delay])

Restart currently running task.

Note, idoit already has built-in restart logic on task errors. Probably, you should not use this method directly. It's exposed for very specific cases.

task.progressAdd(value)

Increment current task progress.

Returns a Promise resolved with task id.

task.setDeadline(timeLeft)

Update current task deadline.

Returns a Promise resolved with task id.

Special tasks

group

Create a new task, executing children in parallel.

queue.group([
  queue.children1(),
  queue.children2(),
  queue.children3()
]).run()

Group result is unsorted array of children result.

chain

Create a new task, executing children in series. If any of children fails - chain fails too.

queue.registerTask('multiply', (a, b) => a * b);
queue.registerTask('subtract', (a, b) => a - b);

queue.chain([
  queue.multiply(2, 3), // 2 * 3 = 6
  queue.subtract(10),   // 10 - 6 = 4
  queue.multiply(3)     // 3 * 4 = 12
]).run()

Result of previous task pass as last argument of next task. Result of chain is result of last task in chain.

iterator

A special way to run huge mapping in lazy style (on demand). See comments below.

// register iterator task
queue.registerTask({
  name: 'lazy_mapper',
  baseClass: Queue.Iterator,
  // This method is called on task begin and on every child end. It can be
  // a generator function or function that return `Promise`.
  * iterate(state) {
    // ...

    // Three types of output states possible: ended, do nothing & new data.
    //
    // 1. `null` - end reached, iterator should not be called anymore.
    // 2. `{}`   - idle, there are enougth subtasks in queue, try to call
    //             iterator later (when next child finish).
    // 3. {
    //      state    - new iterator state to remember (for example, offset for
    //                 db query), any serializeable data
    //      tasks    - array of new subtasks to push into queue
    //    }
    //
    // IMPORTANT! Iterator can be called in parallel from different workers. We
    // use input `state` to resolve collisions on redis update. So, if you
    // create new subtasks:
    //
    // 1. new `state` MUST be different (for all previous states)
    // 2. `tasks` array MUST NOT be empty.
    //
    // In other case you should signal about 'end' or 'idle'.
    //
    // Invalid combination will cause 'end' + error event.
    //
    return {
      state: newState,
      tasks: chunksArray
    };
  }
});

// run iterator
queue.lazy_mapper().run();

Why this crazy magic was invented?

Imagine that you need to rebuild 10 millions of forum posts. You wish to split work by equal small chunks, but posts have no sequential integer enumeration, only mongo IDs. What can you do?

Solution is to use iterative mapper, wich can remember "previous position". In this case, you will do range + limit requests instead of skip + limit. That works well with databases. Additional bonuses are:

Dev notes

Quik-run redis via docker:

# start
docker run -d -p 6379:6379 --name redis1 redis
# stop
docker stop redis1
docker rm redis1

Why one more queue?

Of cause, we are familiar with kue, celery and akka. Our target was have a balance between simplicity and power. So, we don't know if idoit works well in cluster with thousands of instances. But it should be ok in smaller volumes and it's really easy to use.

kue was not ok for our needs, because:

In idoit we cared about:

Redis still can be a point of failure, but that's acceptable price for simplicity. Of cause you can get a better availability via distributed message buses like RMQ. But in many cases it's more important to keep things simple. With idoit you can reuse existing technologies without additional expences.