dropbox / amqp-coffee

An AMQP 0.9.1 client for Node.js.
MIT License
79 stars 30 forks source link

amqp-coffee

Build Status Node.JS AMQP 0.9.1 Client

Sample

AMQP = require('amqp-coffee') # path to this

testData = "the data to be published..  I am a string but could be anything"

amqpConnection = new AMQP {host:'localhost'}, (e, r)->
  if e?
    console.error "Error", e

  amqpConnection.queue {queue: "queueName"}, (e,q)->
    q.declare ()->
      q.bind "amq.direct", "queueName", ()->

      amqpConnection.publish "amq.direct", "queueName", testData, {confirm: true}, (err, res)->
        console.log "Message published"

      consumer = amqpConnection.consume "queueName", {prefetchCount: 2}, (message)->
        console.log message.data.toString()
        message.ack()

      , (e,r)->
        console.log "Consumer setup"
        amqpConnection.publish "amqp.direct", "queueName", "message contents", {deliveryMode:2, confirm:true}, (e, r)->
          if !e? then console.log "Message Sent"

Methods

new amqp-coffee([connectionOptions],[callback])

Creates a new amqp Connection. The connection is returned directly and in the callback. The connection extends EventEmitter.

The callback is called if there is a sucessful connection OR a unsucessful connection and connectionOptions.reconnect is false. If connectionOptions.reconnect is false, you will get a error back in the callback. If no callback is specified it will be emitted.

The connectionOptions argument should be an object which specifies:

Host Examples

host: 'localhost'
host: {host: 'localhost', port: 15672}
host: ['localhost','yourhost']
host: [{host: 'localhost', port:15672}, {host: 'localhost', port:15673}]

Sample Connection

amqp-coffee = require('amqp-coffee')

amqp = new amqp-coffee {host: 'localhost'}, (error, amqpConnection)->
   assert(amqp == amqpConnection)

Reconnect Flow

On a connection close, we start the reconnect process if reconnect is true. After the reconnectDelayTime the hosts are rotated if more than one host is specified. A new connection is atempted, if the connection is not sucessful this process repeats. After a connection is re-establed, all of the channels are reset, this atempts to reopen that channel. Different channel types re-establish there channels differently.

Event: 'ready'

Emitted when the connection is open successfully. This will be called after each successful reconnect.

Event: 'close'

Emitted when a open connection leaves the ready state and is closed.

Event: 'error'

Very rare, only emitted when there's a server version mismatch

connection.queue([queueOptions],[callback])

This returns a channel that can be used to declare, bind, unbind, or delete queus. This on its own does NOT declare a queue. When creating a queue class using connection.queue, you can specify options that will be used in all the child methods.

The queueOptions argument should be an object which specifies:

Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.

queue.declare([queueOptions],[callback])

Will take a new set of queueOptions, or use the default. Issues a queueDeclare and waits on queueDeclareOk if a callback is specified.

amqp = new AMQP, ()->
  amqp.queue({queue:'queueToCreate'}, (err, Queue)->
    Queue.declare (err, res)->
      # the queue is now declared

To use a auto-generated queue name

amqp = new AMQP, ()->
  amqp.queue({queue:''}, (err, Queue)->
    Queue.declare (err, res)->
      queueName = res.queue

queue.delete([queueDeleteOptions],[callback])

The queueDeleteOptions argument should be an object which specifies:

queue.bind(exchange, routingkey, [queueName], [callback])

Sets up bindings from an already existing exchange to an already existing queue

queue.unbind(exchange, routingKey, [queueName], [callback])

Tears down an already existing binding

queue.messageCount(queueOptions, callback)

Rabbitmq specific, re-declares the queue and returns the messageCount from the response

queue.consumerCount(queueOptions, callback)

Rabbitmq specific, re-declares the queue and returns the consumerCount from the response

connection.exchange([exchangeArgs],[callback])

This returns a channel that can be used to declare, bind, unbind, or delete exchanges. This on its own does NOT declare a exchange. When creating an exchange class using connection.exchange, you can specify options that will be used in all the child methods.

The exchangeArgs argument should be an object which specifies:

Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.

exchange.declare([exchangeArgs],[callback])

exchange.delete([exchangeDeleteOptions], [callback])

The exchangeDeleteOptions argument should be an object which specifies:

exchange.bind(destinationExchange, routingKey, [sourceExchange], [callback])

Rabbitmq Extension, to bind between exchanges, sourceExchange if omitted will be defaulted to the exchange it's being called on.

exchange.unbind(exchange.unbind(destinationExchange, routingKey, [sourceExchange], [callback])

Rabbitmq Extension, to bind between exchanges, sourceExchange if omitted will be defaulted to the exchange it's being called on.

connection.publish(exchange, routingKey, data, [publishOptions], [callback])

amqp-coffee manages publisher channels and sets them up on the first publish. Confirming is a state a channel must be put in, so a channel is needed for confimed publishes and one for non confimed publishes. They are only created on demand. So you should have a maximum of 2 channels publishing for a single connection.

New in 0.1.20 if you set the mandatory or immediate flag with the confirm flag we add a tracking header on that message headers.x-seq which is a numeric representation of that message just like the sequence number. That flag is used to re-connect a messages that has failed publishing and come back as a "basicReturn" to a already existing callback. This allows you to publish to a queue that may not exist and get a bounce if it doesnt. Or if a queue is in a bad state the message will fail routing and come back.

connection.consume(queueName, options, messageListener, [callback])

consumers use their own channels and are re-subscribed to on reconnect. Returns a consumer object.

messageListener is a function that gets a message object which has the following attributes:

listener = (message)->
  # we will only get 1 message at a time because prefetchCount is set to 1
  console.log "Message Data", message.data
  message.ack()

amqp = new AMQP ()->
  amqp.queue {queue: 'testing'}, (e, queue)->
    queue.declare ()->
      queue.bind 'amq.direct', 'testing', ()->
        amqp.publish 'amq.direct', 'testing', 'here is one message 1'
        amqp.publish 'amq.direct', 'testing', 'here is one message 2'

      amqp.consume 'testing', {prefetchCount: 1}, listener, ()->
        console.log "Consumer Ready"

consumer Event: error

Errors will be emitted from the consumer if we can not consumer from that queue anymore. For example if you're consuming a autoDelete queue and you reconnect that queue will be gone. It will return the raw error message with code as the message.

consumer Event: cancel

The cancel event will be emitted from the consumer if we receive a server initiated "basic.cancel". For this to happen you must let the server know you are expecting a cancel, you do this by specifying clientProperties on connect. clientProperties: { capabilities: { consumer_cancel_notify: true }} https://www.rabbitmq.com/consumer-cancel.html

consumer.setQos(prefetchCount, [callback])

Will update the prefetch count of an already existing consumer; can be used to dynamically tune a consumer.

consumer.cancel([callback])

Sends basicCancel and waits on basicCancelOk

consumer.pause([callback])

consumer.cancel

consumer.close([callback])

Calls consumer.cancel, if we're currently consuming. Then calls channel.close and calls the callback as soon as the channel close is sent, NOT when channelCloseOk is returned.

consumer.resume([callback])

consumer.consume, sets up the consumer with a new consumer tag

consumer.flow(active, [callback])

An alias for consumer.pause (active == false) and consome.resume (active == true)

connection.close()

More documentation to come. The tests are a good place to reference.

Differences between amqp-coffee and node-amqp

First of all this was heavily inspired by https://github.com/postwait/node-amqp

Changes from node-amqp