jdarling / MongoMQ

MongoMQ is a messaging queue built on top of Node.js and MongoDB's tailable cursors.
MIT License
90 stars 20 forks source link

MongoMQ v0.3.x

Version 0.3.x introduces new functionality and many bug fixes over v0.2.x. It also introduces some minor changes to packet format and thus is not backwards compatable with v0.2.x

Biggest differences from 0.2.x to 0.3.x

Installation

From GitHub

Using NPM

Requirements

What is MongoMQ?

MongoMQ is a messaging queue built on top of Node.js and MongoDB's tailable cursors. It allows for distributed of messages across workers in both a single reciever and broadcast method.

What MongoMQ is NOT

MongoMQ does NOT (currently) support callback's once a message is processed. Instead it is recommended that you use a one time listener to pickup responses if this is required.

Supported Methods

new MongoMQ(options)

MongoMQ.start([callback])

Starts the queue listener and sets up the emitter.

Params:

MongoMQ.stop([callback])

Stops listening for messages and closes the emitter.

Params:

MongoMQ.emit(msgType, data, [completeCallback])

Places the a message of msgTye on the queue with the provided data for handlers to consume.

Params:

MongoMQ.on(msgType, [options], handler)

Sets up a listener for a specific message type.

Params:

options -

MongoMQ.listeners(event)

Provides back an array of listeners that matche the supplied event. Returns an empty array if no listeners are subscribed to the event.

Params:

MongoMQ.removeListener(event, handler)

Shuts down the first listener that matches the supplied event and handler and removes it from the listeners list.

Params:

MongoMQ.removeAllListeners(event)

Shuts down ALL listeners for the specified event and removes them from the listeners list.

Params:

How does MongoMQ work?

MongoMQ sets up a tailable collection and then starts listeners using find in conjunction with findAndModify to pickup messages out of this collection.

Since MongoMQ is basically a wrapper around MongoDB's built in support for tailable cursors it is possible to place listeners built in other langauges on the "queue".

Sample Usage

You should see the two listeners pickup messages one at a time with whoever has resources to process picking up the message first.

bin/test.js

var MongoMQ = require('../lib/MongoMQ').MongoMQ;
var repl = require('repl');

var queue = new MongoMQ({
  autoStart: true
});

var r = repl.start({
      prompt: "testbed>"
    });
r.on('exit', function(){
  queue.stop();
});

var msgidx = 0;
r.context.send = function(){
  queue.emit('test', msgidx);
  msgidx++;
};

r.context.load = function(){
  for(var i = 0; i<100; i++){
    queue.emit('test', msgidx);
    msgidx++;
  }
};

var logMsg = function(err, data, next){
      console.log('LOG: ', data);
      next();
    };
var eatTest = function(err, data, next){
      console.log('eat: ', data);
      next();
    };

r.context.logAny = function(){
  queue.onAny(logMsg);
};

r.context.listen = function(){
  queue.on('test', eatTest);
};

r.context.start = function(cb){
  queue.start(cb);
};

r.context.stop = function(){
  queue.stop();
};

r.context.help = function(){
  console.log('Built in test methods:\r\n'+
      '  help()    - shows this message\r\n'+
      '  logAny()  - logs any message to the console\r\n'+
      '  eatTest() - consumes next available "test" message from the queue\r\n'+
      '  send()    - places a "test" message on the queue\r\n'+
      '  load()    - places 100 "test" messages on the queue\r\n'+
      '  start()   - start the queue listener\r\n'+
      '  stop()    - stop the queue listener\r\n'+
      '\r\nInstance Data\r\n'+
      '  queue - the global MongoMQ instance\r\n'
      );
  return '';
};

/*
queue.start(function(){
  r.context.eatTest();
});
*/

r.context.queue = queue;

r.context.help();

How Events are stored

{
  _id: ObjectId(), // for internal use only
  pkt_ver: 3, // Packet version that this message is being sent in
  event: event, // string that represents what type of event this is
  data: message, // Contains the actual message contents
  handled: false, // states if the message has been handled or not
  localTime: dt, // Local Date Time of when the message was put on the queue
  globalTime: new Date(dt-self.serverTimeOffset), // Date Time offset to server time of when the message was put on the queue
  pickedTime: new Date(dt-self.serverTimeOffset), // Date Time offset to server time of when the message was picked up from the queue
  host: string, // Contains the host name of the machine that initiated the event
  [response_id: string] // optional if the event expects response(s) this will be the conversation identifier used to track those responses
}

Update History

v0.3 Update History

v0.3.4

v0.3.3

v0.3.2

v0.3.1

v0.3.0

v0.2 Update History

v0.2.10&v0.2.11

v0.2.9

v0.2.8

v0.2.7

v0.2.6

v0.2.5

v0.2.4

v0.2.3

v0.2.2

v0.2.1

v0.1.1

v0.1.0