AMQP made easy
"Happy Pea" Copyright FancyFerret on DeviantArt
Install
npm install --save amqpea
All-in-one example:
var amqpea = require('amqpea');
function die(err) {
throw err;
}
var uri = 'amqp://guest:guest@localhost:5672/%2F';
var amqp = amqpea(uri, { timeout: 2000 });
amqp.on('error', die);
amqp.on('ready', function() {
amqp.declareExchange({
name: 'x'
}, whenExchangeReady);
});
function whenExchangeReady(err) {
if (err) return die(err);
amqp.declareQueue({
name: 'q',
exclusive: true,
binding: {
exchange: 'x',
keys: ['route']
}
}, whenQueueReady);
}
function whenQueueReady(err) {
if (err) return die(err);
beginPublishing();
var consumer = amqp.createQueueConsumerChannel('q', 1);
consumer.consume('ack', 'exclusive', function(msg) {
var body = msg.fromJSON();
console.log("Received: %j", body);
msg.ack();
});
}
function beginPublishing() {
var i = 0;
var publisher = amqp.createPublishChannel('confirm');
setInterval(function() {
publisher.publish('x', 'route', { num: ++i }, function(err) {
if (err) return die(err);
console.log("Published message %d", i);
});
}, 1000);
}
More examples can be found in the examples folder.
Most of these options correspond directly to an AMQP protocol concept, for more information see the AMQP 0.9.1 reference.
Establish a new AMQPConnection instance.
urisOrUri {string or array(string)}
Pass one or more AMQP URIs, the first one that works will be connected to.
An AMQP uri looks like amqp://login:password@hostname:port/vhost
.
Note that if the vhost begins with a /
, this needs to be URL encoded, so /default
becomes a URL path of /%2Fdefault
.options {object}
Various options to control the client's behaviour
timeout {number}
Number of milliseconds to wait before considering the connection timed outdebug {boolean}
Set to true to log a bunch of debugging messages to STDERR. Can also be enabled by setting the environment variable NODE_DEBUG_AMQP
to a non-empty value.heartbeat {boolean or number}
Control the AMQP protocol heartbeat: false for no heartbeats, true to do what the server says, or a number of seconds to override.client {object}
Send some strings to the server to help identify the client. Allowed keys are product
, version
, platform
, copyright
and information
. Product, version and platform default to something useful.Instances represent a connected AMQP client, use the main amqpea
export to create an instance.
Fired when the server has an error.
err {Error}
The exception that occurredThe connection object will not be usable after an error has been emitted. By default node.js will exit your program if you don't listen for this event.
Fired when the server connection has been closed.
hadError {boolean}
True when server is closing due to errorFired when the server connection is ready to use.
Fired for every failed server connection.
uri {string}
The URI that failed to connecterr {Error}
The exception that occurredWhen attempting to connect to multiple servers, this is the only way to see why servers are failing. If none of the servers can be connected to, the error
event will be fired with the same err
as the last connection-error
.
Declare an exchange on the server.
options {object}
Various options
name {string}
name of the exchangetype {string}
type of the exchange, default: topic
passive {boolean}
only re-use existing exchange, default: false
durable {boolean}
persist exchange across broker restarts, default: false
autoDelete {boolean}
delete exchange when queues are finished using it, default: false
internal {boolean}
disallow publishing directly to the exchange, default: false
callback(err) {function}
Called when exchange declaration is confirmed
err {Error}
non-null when an error occurredTo publish to an exchange, use createPublishChannel
.
Declare a queue on the server.
options {object}
Various options
name {string}
name of the queue, leave blank to let the server generate a unique namepassive {boolean}
only re-use existing queue, default: false
durable {boolean}
persist queue across broker restarts, default: false
exclusive {boolean}
only allow use by this connection, default: false
autoDelete {boolean}
delete queue when all consumers are finished, default: false
binding {object}
optional, configure the queue's bindingsexchange {string}
name of the exchange to bind tokeys {array(string)}
which routing keys to bindarguments {object}
optional, arguments table passed to queue.bindarguments {object}
optional, arguments table passed to queue.declarecallback(err, queue) {function}
Called when queue declaration is confirmed
err {Error}
non-null when an error occurredqueue {object}
Contains name
as a {string}
.declareQueue calls queue.declare followed by queue.bind if the binding
attribute is set in options
. queue.bind is called once for each key
in the binding.keys
array with the binding.exchange
and binding.arguments
as arguments.
If binding
is not present in the options or binding.keys
is empty
then queue.bind will not be called.
TODO: write this
TODO: write this
TODO: write this
TODO: write this
TODO: write this
TODO: write this
Represents a channel to be used for consuming messages.
The consumer's tag.
Can be thrown if an ack
or reject
fails.
TODO: move these errors into the actions' callbacks.
Begin consuming the queue.
ack {boolean}
Enable message acknowledgementexclusive {boolean}
Request that only this consumer can use the queuehandler(msg) {function}
Called on every message with an AMQPMessage
object{object}
Delivery information, likely to change in future versions.
{object}
Message properties, likely to change in future versions.
{Buffer}
The raw message content.
Decode a JSON message into an object, may throw
an Error
.
Acknowledge the message with the server.
Reject the message from the server (via basic.reject).
To run the tests you will need a local AMQP server. The testsuite talks to the broker as well as via the HTTP admin API.
There are environment variables to set which tell the test runner how to connect.
AMQP_USERNAME
defaults to "guest"AMQP_PASSWORD
defaults to "guest"AMQP_HOSTNAME
defaults to "localhost"AMQP_PORT
defaults to 5672AMQP_VHOST
defaults to "/"AMQP_ADMIN_PORT
defaults to 15672AMQP_ADMIN_PROTO
defaults to "http"AMQP_ADMIN_SSL_INSECURE
defaults to falseWith the appropriate variables set, use npm to run the testsuite.
npm test