amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
273 stars 80 forks source link

Node.js CI

rhea

A reactive library for the AMQP protocol, for easy development of both clients and servers.

Hello World!

Brief example of sending and receiving a message through a broker/server listening on port 5672:

var container = require('rhea');
container.on('message', function (context) {
    console.log(context.message.body);
    context.connection.close();
});
container.once('sendable', function (context) {
    context.sender.send({body:'Hello World!'});
});
var connection = container.connect({'port':5672});
connection.open_receiver('examples');
connection.open_sender('examples');

output:

Hello World!

Dependencies

Examples

There are some examples of using the library under the examples folder. These include:

These last two can be used together to demsontrate sending messages from one process to another, using a broker or similar intermediary to which they both connect.

The direct_recv.js example can be used in conjunction with the simple_send.js example to demonstrate sending messages between processes without the use of any intermediary. Note however the the default port of one or ther other will need to be changed through the -p command line option.

To run the examples you will need the dependencies installed: the library itself depends on the 'debug' module, and some of the examples depend on the 'yargs' module for command line option parsing.

The 'rhea' module itself must also be findable by node. You can do this either by checking out the code from git and setting NODE_PATH to include the directory to which you do so (i.e. the directory in which 'a directory named 'rhea' can be found, or you can install the module using npm.

Some of the examples assume an AMQP compatible broker, such as those offered by the ActiveMQ or Qpid Apache projects, is running.

API

There are five core types of object in the API:

Each of these inherits all the methods of EventEmitter, allowing handlers for particular events to be attached. Events that are not handled at sender or receiver scope are then propagated up to possibly be handled at session scope. Events that are not handled at session scope are then propagated up to possibly be handled at connection scope, and if not there then in container scope.

Two other relevant objects are:


Container

An AMQP container from which outgoing connections can be made and/or to which incoming connections can be accepted. The module exports a default instance of a Container which can be used directly. Other instances can be created from that if needed using the create_container method. A container is identified by the id property. By default a uuid is used, but the property can be set to something more specific if desired before making or accepting any connections.

methods:

connect(options)

Connects to the server specified by the host and port supplied in the options and returns a Connection.

The options argument is an object that may contain node library socket.connect and tls.connect options and any of the following fields:

As well as Container options common for both client and server:

If options is undefined, the client will attempt to obtain default options from a JSON config file. This file is of similar structure to that used by Apache Qpid Proton clients. The location of the file can be specified through the MESSAGING_CONNECT_FILE environment variable. If that is not specified it will look for a file called connect.json in the current directory, in /.config/messaging or /etc/messaging/.

The config file offers only limited configurability, specifically:

listen(options)

Starts a server socket listening for incoming connections on the port (and optionally interface) specified in the options.

The options argument is an object that may contain node library net.createServer and its server.listen or tls.createServer and its server.listen options, most AMQP Container fields listed for connect and any of the following fields:

The options argument is an object that may contain any of the following fields:

create_container()

Returns a new container instance. The method takes an options object which can contain the following field:

If no id is specified a new uuid will be generated.

generate_uuid()

Simple utility for generating a stringified uuid, useful if you wish to specify distinct container ids for different connections.

websocket_connect()

Returns a function that can be used to create another function suitable for use as the value of 'connection_details' in a connect call in order to connect over websockets. The function returned here takes a websocket url and optional arguments. The websocket_connect method itself take the constructor of the WebSocket implementation to use. It has been tested with the implementation in firefox and also that in the node module 'ws'.

websocket_accept()

Used to start handling an incoming websocket connection as an AMQP connection. See the websocket echo server example for how to use it.


Connection

methods:

open_receiver(address|options)

Establishes a link over which messages can be received and returns a Receiver representing that link. A receiving link is a subscription, i.e. it expresses a desire to receive messages.

The argument to this method can either be a simple string indicating the source of messages of interest (e.g. a queue name), or an options object that may contain any of the following fields:

And attach frame fields:

Note: If the link doesn't specify a value for the credit_window and autoaccept options, the connection options are consulted followed by the container options. The default is used only if an option is not specified at any level.

open_sender(address|options)

Establishes a link over which messages can be sent and returns a <a href="#sender">Sender representing that link. A sending link is an analogous concept to a subscription for outgoing rather than incoming messages. I.e. it expresses a desire to send messages.

The argument to this method can either be a simple string indicating the target for messages of interest (e.g. a queue name), or an options object that may contain any of the following fields:

And attach frame fields as for open_receiver.

Note: If the link doesn't specify a value for the autosettle option, the connection options are consulted followed by the container options. The default is used only if an option is not specified at any level.

send(message)

Sends the specified message over the default sender, which is a sending link whose target address is null. The use of this method depends on the peer supporting so-called 'anonymous relay' semantics, which most AMQP 1.0 brokers do. The message should have the 'to' field set to the intended destination.

close()

Closes a connection (may take an error object which is an object that consists of condition and description fields).

is_open()/is_closed()

Provide information about the connection status. If it's opened or closed.

create_session()

Creates a new session if you want to manage sessions by yourself.

events:

connection_open

Raised when the remote peer indicates the connection is open. This occurs also on reconnect.

connection_close

Raised when the remote peer indicates the connection is closed. This can happen either as a response to our close, or by itself. The connection and sessions will not be reconnected.

connection_error

Raised when the remote peer indicates the connection is closed and specifies an error. A connection_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as a property on the event context.

If neither the connection_error or the connection_close is handled by the application, an error event will be raised. This can be handled on the connection or the container. If this is also unhandled, the application process will exit.

protocol_error

Raised when a protocol error is received on the underlying socket. A disconnected event will follow with any reconnect as configured.

error

Raised when an error is received on the underlying socket. This catches any errors otherwise not handled.

disconnected

Raised when the underlying tcp connection is lost or nonfatal error was received. The context has a reconnecting property which is true if the library is attempting to automatically reconnect and false if it has reached the reconnect limit. If reconnect has not been enabled or if the connection is a tcp server, then the reconnecting property is undefined. The context may also have an error property giving some information about the reason for the disconnect. If the disconnect event is not handled, a warning will be logged to the console.

You should update the application state to resend any unsettled messages again once the connection is recovered.

settled

Raised when remote settled the message.


Session

Session is an aggregation of Receiver and <a href="#sender">Sender links and provides the context and sequencing of messages for all the links it contains. A <a href="#connection">Connection creates a default session for you if you create receivers and senders on the Connection. You only need to use this object if you want to group your links into more than one session.

methods:

open_receiver(address|options)

This adds a receiver on the session. The open_receiver on the <a href="#connection">Connection object finds the session and calls this.

open_sender(address|options)

This adds a sender on the session. The open_sender on the <a href="#connection">Connection object finds the session and calls this.

close()

End a session (may take an error object which is an object that consists of condition and description fields).

is_open()/is_closed()

Provide information about the session status. If it's opened or closed.

events:

session_open

Raised when the remote peer indicates the session is open (i.e. begun in AMQP parlance).

session_close

Raised when the remote peer indicates the session is closed (i.e. ended in AMQP parlance). The session will be removed from the connection after the event.

session_error

Raised when the remote peer indicates the session has ended and specifies an error. A session_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as error property on the session object.

If neither the session_error or the session_close is handled by the application, an error event will be raised on the container. If this is also unhandled, the application process will exit.

settled

Raised when remote settled the message.


Receiver

methods:

close()

Closes a receiving link (i.e. cancels the subscription). (May take an error object which is an object that consists of condition and description fields).

detach()

Detaches a link without closing it. For durable subscriptions this means the subscription is inactive, but not cancelled.

add_credit(n)

By default, receivers have a prefetch window that is moved automatically by the library. However if desired the application can set the prefecth to zero and manage credit itself. Each invocation of add_credit() method issues credit for a further 'n' messages to be sent by the peer over this receiving link. [Note: flow()is an alias for add_credit()]

credit()

Returns the amount of outstanding credit that has been issued.

events:

message event

Raised when a message is received. The context passed will have a <a href="#message">message, containing the received content, and a <a href="#delivery">delivery which can be used to acknowledge receipt of the message if autoaccept has been disabled.

receiver_open

Raised when the remote peer indicates the link is open (i.e. attached in AMQP parlance).

receiver_drained

Raised when the remote peer indicates that it has drained all credit (and therefore there are no more messages at present that it can send).

receiver_flow

Raised when a flow is received for receiver.

receiver_error

Raised when the remote peer closes the receiver with an error. A receiver_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as an error property on the receiver.

receiver_close

Raised when the remote peer indicates the link is closed (i.e. detached in AMQP parlance).

settled

Raised when remote settled the message.


Sender

methods:

send(msg)

Sends a message. The link need not be yet open nor is any credit needed, but there is a limit of 2048 deliveries in the Session queue before it raises an exception for buffer overflow.

Unsettled messages, whether transmitted or not, are lost on reconnect and there will be no accepted, released, rejected events. You may need to resend the messages on a disconnected event.

If the messages to be sent can be generated or fetched on demand or there is large number of messages, it is recommended send is called only while the sender is sendable(). When sender is no longer sendable, continue sending in the sendable event.

close()

Closes a sending link (may take an error object which is an object that consists of condition and description fields).

detach()

Detaches a link without closing it.

sendable()

Returns true if the sender has available credits for sending a message. Otherwise it returns false.

set_drained(bool)

This must be called in response to sender_draining event to tell peer we have drained our messages or credit.

events:

sendable

Raised when the sender has received credit to be able to transmit messages to its peer. You will not receive a new event until the peer sends more credit, even if you have some credit left.

accepted

Raised when a sent message is accepted by the peer.

released

Raised when a sent message is released by the peer.

rejected

Raised when a sent message is rejected by the peer. context.delivery.remote_state.error may carry diagnostics to explain rejection, for example a condition property with value amqp:unauthorized-access.

modified

Raised when a sent message is modified by the peer. The context.delivery.remote_state may have delivery_failed and undeliverable_here boolean and message_annotations map properties to guide any message retransmission as specified in the AMQP 1.0 specification.

sender_open

Raised when the remote peer indicates the link is open (i.e. attached in AMQP parlance).

sender_draining

Raised when the remote peer requests that the sender drain its credit; sending all available messages within the credit limit and calling set_drained(true). After this the sender has no credit left.

sender_flow

Raised when a flow is received for sender. sender_draining and sendable events may follow this event, so it only needs to be implemented if there are specific actions to be taken.

sender_error

Raised when the remote peer closes the sender with an error. A sender_close event will always follow this event, so it only needs to be implemented if there are specific actions to be taken on a close with an error as opposed to a close. The error is available as an error property on the sender.

sender_close

Raised when the remote peer indicates the link is closed (i.e. detached in AMQP parlance).

settled

Raised when remote settled the message.

Message

A message is an object that may contain the following fields:

Messages are passed to the send() method of Connection or Sender, and are made available as message on the event context for the message event on a Receiver or its parent(s).

Delivery

The delivery object provides information on- and enables control over- the state of a message transfer.

The methods on a delivery object are:

If autoaccept is disabled on a receiver, the application should ensure that it accepts (or releases or rejects) all messages received.


Note: For detailed options and types, please refer to the type definitions in the typings directory.