A bus.io dependency.
The message exchange provides an iterface for publishing a message to a queue, handling that message, and potentially propagating that message to its destination.
Install node.js (See download and install instructions here: http://nodejs.org/).
Install redis (See download and install instructions http://redis.io/topics/quickstart)
Install coffee-script
> npm install coffee-script -g
Clone this repository
> git clone git@github.com:turbonetix/bus.io-exchange.git
cd into the directory and install the dependencies
> cd bus.io-exchange
> npm install && npm shrinkwrap --dev
This is wheere we publish, handle, and propagate messages.
var exchange = require('bus.io-exchange')();
var Exchange = require('bus.io-exchange');
var exchange = Exchange(Exchange.Queue(), Exchange.PubSub());
var Exchange = require('bus.io-exchange');
var queue = Exchange.Queue();
var pubsub = Exchange.PubSub();
var handler = new EventEmitter();
var exchange = Exchange(queue, pubsub, handler);
Puts the message onto the Queue
if the message has not already been published to the Queue
.
If the message has already been published to the Queue
it will be published onto the PubSub
.
var Message = require('bus.io-common').Message;
exchange.publish( Message() );
Puts the message onto the PubSub
with the channel
being "everyone"
.
var message = Message();
exchange.publish( message, 'everyone' );
Subscribes a listener
to the channel and invokes the callback when the
channel as been subscribed.
exchange.subscribe('some channel', function listener (message) {
//this gets called when we receive a message on the channel
}, function callback (err, channel) {
//this gets called when we subscribed to the channel
});
Unsubscribes the listener
from the channel and invokes the callback when the
listener as been unsubscribed.
var listener = function (message) { };
exchange.unsubscribe('some channel', listener, function callback (err, channel) {
//this gets called when we unsubscribed from the channel
});
Gets the Queue
instance.
var queue = Exchange.queue();
queue.send(Message());
Sets the Queue
instance.
var kue = require('kue');
var queue = messageExchange.Queue.make(kue.createClient());
exchange.queue(queue);
Gets the pubsub instance.
var pubsub = exchange.pubsub();
pubsub.send(message, 'everyone');
Sets the pubsub instance.
var redis = require('redis');
var pub = redis.createClient();
var sub = redis.createClient();
var pubsub = messageExchange.PubSub.make(pub, sub);
exchange.pubsub(pubsub);
Gets the handler which is an EventEmitter
.
var handler = exchange.handler();
handler.on('some message', function (message, exchange) {
// do something
exchange.channel(message.target).publish(message);
});
Sets the handler.
var events = require('events');
var handler = new events.EventEmitter;
handler.on('some message', function (message, exchange) {
// do something
exchange.channel(message.target).publish(message);
});
exchange.handler(handler);
The queue is a lightweight wrapper around an object that supports a
method process(name, fn)
. Where name
is a String
and fn
is a
Function
. It must also support the method create(name, data)
where
name
is a String
and data
is an Object
. The return value of
the create
method must expose a function done()
. In our case
we used the Kue
library. It is a really nice library for handling jobs.
var queue = Exchange.Queue();
var kue = require('kue');
var queue = Exchange.Queue(kue.createQueue());
queue.send(Message());
The pubusb is a lightweight wrapper around the redis
module. You could
pass in another object insead of the redis
object. By making sure it
supports these methods subscribe(name,cb)
, unsubscribe(name,cb)
,
publish(channel, data)
.
var pubsub = Exchange.PubSub();
var pub = redis.createClient()
, sub = redis.createClient();
var pubsub = Exchange.PubSub(pub, sub);
var message = Message();
pubsub.send(message, message.target());
pubsub.subscribe('channel', function (err, channel) {
if (err) throw err;
console.log('channel subscribed');
});
pubsub.unsubscribe('channel', function (err, channel) {
if (err) throw err;
console.log('channel unsubscribed');
});
Tests are run using grunt. You must first globally install the grunt-cli with npm.
> sudo npm install -g grunt-cli
To run the tests, just run grunt
> grunt