saharki / icqueue

0 stars 0 forks source link

ICQueue

Simple consuming and publishing from/to RabbitMQ.

Declarative API to consume from a rabbitMQ queue and to perform publish operations.

Example usage

const ICQueue = require('icqueue');

var config = {
  url: process.env.AMQP_URL,
  exchange: process.env.AMQP_EXCHANGE,
  queue: {
    name: process.env.AMQP_CONSUME,
    routingKey: process.env.AMQP_ROUTING_KEY, // If supplied, queue is bound to
    // this key (or keys) on the exchange. NB Can be an array of strings or just
    // a string.
    options: {/* ... */} // Advanced: options passed to ch.assertQueue() in wrapped `amqplib`.
  },
  // Set the QOS/prefetch (defaults to 1)
  prefetch: 100
};

const icq = new ICQueue(config);

async function main () {
  // Must call this before you consume/publish/etc...
  await icq.connect();

  // Consuming
  var handleMessage = function(message, callback) {
    //... Do things
    callback();
  };
  // You must call:
  callback(err, requeue)
  // in your handleMessage. If `err` !== `null` then the message will be `nack`ed.
  // Requeueing will be requeue iff `requeue` is `true`.
  // If `err` is `null` then the message is `ack`ed.
  // If an exception occurs in handleMessage, then the message is `nack`ed and not requeued.

  // Start consuming:
  icq.consume(handleMessage);

  // Publishing to arbitrary routing key.
  await icq.publish(routingKey, payload, options);
}

If payload is an object, it will be turned into JSON.

Details

This is a wrapper to https://github.com/squaremo/amqp.node (amqplib).

Tests

Start a rabbit server, preferably a 'throw away' one with fresh state. You can do this like so if you have docker:

docker run -d --rm -p 5672:5672 rabbitmq

Wait for it to finish starting up, then:

npm test

Note that tests/config.js currently assumes you are using boot2docker (on a Mac) so you may need to hack that stuff (or it may just work as it should just use localhost if it's not there... unproven though.)

API

ICQueue

Class to contain an instantiated connection/channel to AMQP with a given config.

Kind: global class

new ICQueue(config)

Instantiate an AMQP wrapper with a given config.

Param Type
config object
config.url string
config.exchange string
config.queue object
config.queue.name string
config.queue.routingKey Array.<string> | string
config.queue.options object

icqueue.connect() ⇒ Promise

Connects, establishes a channel, sets up exchange/queues/bindings/dead lettering.

Kind: instance method of ICQueue

icqueue.close() ⇒ Promise

Closes connection.

Kind: instance method of ICQueue

icqueue.publish(routingKey, message, options) ⇒ Promise

Publish a message to the given routing key, with given options.

Kind: instance method of ICQueue

Param Type
routingKey string
message object | string
options object

icqueue.consume(handleMessage, options) ⇒ Promise

handleMessage() is expected to be of the form: handleMessage(parsedMessage, callback). If callback is called with a non-null error, then the message will be nacked. You can call it like: callback(err, requeue) in order to instruct rabbit whether to requeue the message (or discard/dead letter).

If not given, requeue is assumed to be false.

cf http://squaremo.github.io/amqp.node/doc/channel_api.html#toc_34

Kind: instance method of ICQueue

Param Type
handleMessage function
options object