This is a summary. See the amqp-ts Wiki for the full documentation of the library.
Amqp-ts is a library for nodejs that simplifies communication with AMQP message busses written in Typescript. It has been tested on RabbitMQ. It uses the amqplib library by Michael Bridgen (squaremo).
Starting in version 0.14 the return type of exchange.rpc and queue.rpc changed from 'Promise < any >' to 'Promise < Message >'.
Starting in version 0.12 the Message class has been added. It is a more elegant way to send and receive messages. It is the preferred way to deal with sending and receiving messages.
The library is considered production ready.
It does depend on the following npm libraries:
The DefinitelyTyped tsd tool is used to manage the typescript type definitions.
No need to nest functionality, just create a connection, declare your exchanges, queues and bindings and send and receive messages. The library takes care of any direct dependencies.
If you define an exchange and a queue and bind the queue to the exchange and want to make
sure that the queue is connected to the exchange before you send a message to the exchange you can call the connection.completeConfiguration()
method and act on the promise it returns.
import * as Amqp from "amqp-ts";
var connection = new Amqp.Connection("amqp://localhost");
var exchange = connection.declareExchange("ExchangeName");
var queue = connection.declareQueue("QueueName");
queue.bind(exchange);
queue.activateConsumer((message) => {
console.log("Message received: " + message.getContent());
});
// it is possible that the following message is not received because
// it can be sent before the queue, binding or consumer exist
var msg = new Amqp.Message("Test");
exchange.send(msg);
connection.completeConfiguration().then(() => {
// the following message will be received because
// everything you defined earlier for this connection now exists
var msg2 = new Amqp.Message("Test2");
exchange.send(msg2);
});
var amqp = require("amqp-ts");
var connection = new amqp.Connection("amqp://localhost");
var exchange = connection.declareExchange("ExchangeName");
var queue = connection.declareQueue("QueueName");
queue.bind(exchange);
queue.activateConsumer((message) => {
console.log("Message received: " + message.getContent());
});
// it is possible that the following message is not received because
// it can be sent before the queue, binding or consumer exist
var msg = new amqp.Message("Test");
exchange.send(msg);
connection.completeConfiguration().then(() => {
// the following message will be received because
// everything you defined earlier for this connection now exists
var msg2 = new amqp.Message("Test2");
exchange.send(msg2);
});
More examples can be found in the tutorials directory.
To know the status of the connection: connection.isConnected
. Returns true if the connection exists and false, otherwise.
#on('open_connection', function() {...})
It's emitted when a connection is concretized and can publish/subscribe in Rabbit Bus.
#on('close_connection', function() {...})
It's emitted when a connection is closed, after calling the close method.
#on('lost_connection', function() {...})
It is emitted when the connection is lost and before attempting to re-establish the connection.
#on('trying_connect', function() {...})
It is emitted during the time that try re-establish the connection.
#on('re_established_connection', function() {...})
It is emitted when the connection is re-established.
#on('error_connection', function(err) {...})
It's emitted when a error is registered during the connection.
When the library detects that the connection with the AMQP server is lost, it tries to automatically reconnect to the server.
Promise
with queue.activateConsumer
for RPC's.
The result of the resolved Promise
will be returned to the RPC caller.noCreate
creation option property for Exchange
and Queue
(expects the exchange or queue to already exist
in AMQP)name
property for Exchange
and Queue
and type
property for Exchange
Exchange
and Queue
methods close
and delete
prefetch
option to DeclarationOptions
in the amqp-ts.d.ts
filepath
library does not have a method parse
in 0.10.xexchange.close
, exchange.delete
, queue.close
and queue.delete
return the same promise (and are thereby executed only once)exchange.close
, exchange.delete
, queue.close
and queue.delete
The roadmap section describes things that I want to add or change in the (hopefully near) future.