============================================================================================
MQ Messages
Each message is comprised of one or more frames. Each frame is an arbitrary blob of data.
There are curently 6 different messages supported. The general message format is given below
where each line corresponds to a frame.
--------Sending message format------
address 1
address 2
....
address N
empty frame
version
Command
id
arg 1
arg 2
....
arg M
empty frame
------Receiving message format for TRACE_ROUTER-------
empty frame
version
Command
id
arg 1
arg 2
....
arg M
empty frame
address 1
address 2
....
address N
address fames
Each address frame corresponds to the next hop in routing form the source to the destination.
The simplest case only has a single address frame.
TRACE_ROUTER devices append the source address of the message on receipt allowing one to reverse
path taken and send a reply back to the message originator.
empty frame
Empty frames are used to separate an address from the rest of the command and it's arguments.
It's also used as a sanity check on message integrity.
Version frame
Determines the message protocol formant to use. Currently this value is stored in the constant
MQF_VERSION_KEY which is set to LMQv100.
Command frame
How to interpret the message. The crrent available commands are:
MQF_PING_KEY - Ping command
MQF_PONG_KEY - Pong response to a ping command
MQF_EXEC_KEY - Send the message to the remote address. No tracking or response is allowed.
Upon receipt at the destination the message is passed to the application for execution.
MQF_TRACKEXEC_KEY - Same as MQF_EXEC_KEY but the command is tracked at the source. This allows
for detection of closed or dead hosts. Optionally a MQF_TRACKADDRESSS_KEY can be sent
as a response while the remote host processes the command. The message ID frame is used in
any response (MQF_TRACKADDRESS_KEY or MQF_RESPONSE_KEY).
Since the command is tracked a response is exptected in the form of a MQF_RESPONSE_KEY command.
MQF_TRACKADDRES_KEY - Optionally sent in response to an MQF_TRACKEXEC_KEY command. Only needed
if processing the command is deferred or may take some time. In this case sending a track address
comand allows the sender to know the command was received and is being processed. The MQ connection
sender sends periodic heartbeats to detect lost or dead servers.
MQF_RESPONSE_KEY - Contains the response to the MQF_TRACKEXEC_KEY command.
ID frame
Arbitrary data used as a UUID for the command
Argument frames
One or more frames passed to the application.
============================================================================================
Typical Message send/recv sequences.
Below are listed some typical exchanges using a TRACE_ROUTER.
is used to signify one or more address frames.
'/' is used to delineate frames.
-----Simple Ping/Pong-----
Client Sends: //LMQv100/PING/id/
Server Recvs: /LMQv100/PING/id//
Server Sends: //LMQv100/PONG/id/
Client Recvs: /LMQv100/PONG/id/
-------- Simple EXEC with no tracking --------
Client sends: //LMQv100/EXEC/id//
Server Recvs: /LMQv100/EXEC/id//
-------- TRACKEXEC without heartbeating -----------
Client Sends: //LMQv100/TRACKEXEC/id//
Server Recvs: /LMQv100/TRACKEXEC/id//
Server Sends: //LMQv100/RESPONSE/id/
Client Recvs: /LMQv100/RESPONSE/id/
-------- TRACKEXEC with heartbeating -----------
Client Sends: //LMQv100/TRACKEXEC/id//
Server Recvs: /LMQv100/TRACKEXEC/id//
Server Sends: //LMQv100/TRACKADDRESS/id/
Client Recvs: /LMQv100/TRACKADDRESS/id/>
Once the TRACKADDRESS record is reicved the MQ portal then enables heartbeating to the server until
the command timeout is reached or a RESPONSE is received. TRACKADDRESS commands are internal to the
MQ system. The application never sees these commands.
Server Sends: //LMQv100/RESPONSE/id/
Client Recvs: /LMQv100/RESPONSE/id/
============================================================================================
MQ devices/socket types
============================================================================================
The MQ layer supports several different devices. Additional devices can be added. These are based on the
ZeroMQ devices so look to those for queue limits, etc.
---------MQ_PAIR, MQ_DEALER---------
These are all straight 0MQ socket types.
---------MQ_ROUTER--------
This is also a straight 0MQ socket type but it's useful to understand the differences between it and
the other MQ_*_ROUTER types
Send: pop next hop off message and forward
Recv: push sender address onto message for pass to application
---------MQ_SIMPLE_ROUTER--------
Send: pop next hop off message and forward
Recv: pass to application unchanged
---------MQ_TRACE_ROUTER--------
Send: pop next hop off message and forward
Recv: append sender address and pass to application
============================================================================================
Source files
============================================================================================
There are just a handful of source files. The design uses a generic abstraction but in reality
the current implementation is a wrapper around ZeroMQ routines. This is obvious when looking at
the mq_portal.h header.
mq_portal.c - Main implementation source code.
mq_msg.c - Routines to manipulate messages and frames
mq_zmq.c - 0MQ wrappers for functionality.
mq_test.c - Test program
============================================================================================
Usage
============================================================================================
The MQ layer is designed to work within the GOP framework for dispatching tasks to remote
servers and recveiving responses. It has the ability to provide network overlay and do
complex routing between hosts. Each task has a timeout and can be tracked. This allows
detection of remote host failures through heartbeating. The number of connections
to a remote host varies between a user supplied min and max number of connections. The command
backlog is used to decide when to spawn additional connections. Likewise if a connection
is not sustainiang a user supplied minimum command throughput then the connection is culled.
Additionally an MQ portal can be created to accept incoming connections like that used on a
server. Tasks can be submitted either by the GOP tools or via mq_submit. In server mode
user supplied commands can be installed for execution. These key off the first argumaent frame
on EXEC and TRACKEXEC commands.
Typical usage for a client is:
------------------------------------
inip_file_t *ifd = ....
mq_context_t *mqc;
op_generic_t *gop;
//** Create the context
mqc = mq_create_context(ifd, "mq_context");
//** Generate the task
gop = new_mq_op(mqc, msg, client_response_pong, td, free, td->dt);
//** Wait for it to complete
gop_waitall(gop);
//** Destroy when finished
mq_destroy_context(mqc);
--------------------------------------
Look at client_test_thread() in mq_test.c for a fully working example. The format for the repsonse handler,
client_response_pong(), in the above example is:
op_status_t (*fn_response)(void *arg, int id);
The arg passed in is actually a pointer of type mq_task_t. This structure is used for both GOP submissions
and server side tasks. Right now we'll just focus on the GOP or client side meanings.
typedef struct {
mq_msg_t *msg; //** Actual message sent with address
mq_msg_t *response; //** Response message
op_generic_t *gop; //** GOP corresponding to the task.
mq_context_t *ctx; //** Portal context for sending responses.
void *arg; //** Optional argument when calling new_mq_op(). This was free() in the above example.
apr_time_t timeout; //** Initially the DT in sec for the command to complete and converted to abs timeout when sent
void (*my_arg_free)(void *arg); //** Function for cleaning up the GOP arg on GOP destruction.
} mq_task_t;
Likewise for the server side its:
--------------------------------------
mq_context_t *mqc;
mq_command_table_t *table;
uint64_t n;
log_printf(0, "START\n");
//** Make the server context
mqc = mq_create_context(ifd, "mq_context");
//** Make the server portal
server_portal = mq_portal_create(mqc, host, MQ_CMODE_SERVER);
//** Get the command table from the newly created MQ portal
table = mq_portal_command_table(server_portal);
//** Install your commands
mq_command_add(table, MQF_PING_KEY, MQF_PING_SIZE, NULL, cb_ping);
//** Make available for use
mq_portal_install(mqc, server_portal);
//** Wait for a shutdown using an eventFD. Not reuqired Just one way to trigger a shutdown.
//** Could also do it via another command
read(control_efd, &n, sizeof(n));
//** Destroy the portal
mq_destroy_context(mqc);
--------------------------------------
This code is directly lifted from server_test_mq_loop() in mq_test.c.
The format for commands used by the command table is pretty simple:
void command_fn(void *arg, mq_task_t *task)
Arg is the void * pointer provided when adding the operation to the command table.
In the above it corresponds to the NULL in mq_command_add() above.
The other argument, task, uses the same task structure as discussed above but with slightly different meanings:
typedef struct {
mq_msg_t *msg; //** Message received
mq_msg_t *response; //** NULL, Unused
op_generic_t *gop; //** NULL, Unused
mq_context_t *ctx; //** Portal context for sending responses. (Server+GOP)
void *arg; //** Optional argument when calling mq_command_add().
apr_time_t timeout; //** Initially the DT in sec for the command to complete and converted to abs timeout when sent
void (*my_arg_free)(void *arg); //** NULL, Unused
} mq_task_t;
Servers normally send responses directly in more of a fire and forget approach, bypassing the GOP framework. In this case one
uses mq_submit():
int mq_submit(mq_portal_t *p, mq_task_t *task);
The easist way to generate the task is to use the helper function:
mq_task_t *mq_task_new(mq_context_t *ctx, mq_msg_t *msg, op_generic_t *gop, void *arg, int dt);
In the cb_ping() example above translates to
err = mq_submit(server_portal, mq_task_new(server_portal->mqc, response, NULL, NULL, 5));
in proc_trackexec_ping() which is called from cb_ping();
You can look at cb_ping() in mq_test.c for the full example.