Closed JonnyyJ closed 1 year ago
This is just conjecture, as you haven't shared the relevant code, but I suspect that the issue is in the logic in cleanUp
, and cross over interaction / contamination between listenerClosure
and publisherClosure
. IE. for some reason they end up pointing at the same HCONN
.
Again conjecture, but it could be as simple as scoping issues using var
in lieu of let
.
This is just conjecture, as you haven't shared the relevant code, but I suspect that the issue is in the logic in
cleanUp
, and cross over interaction / contamination betweenlistenerClosure
andpublisherClosure
. IE. for some reason they end up pointing at the sameHCONN
.Again conjecture, but it could be as simple as scoping issues using
var
in lieu oflet
.
I see, this also happened in our listener while we didn't call publisher, but I will give it a try as your mentioned.
Here's my listener:
const MQC = mq.MQC;
const logger = getConsoleLogger();
/**
*
* listener execution
* @param {MQQueueManager} queueManager
* @param {string} queueName
* @param {EventEmitter} emitter
* @return {Function} recursively calls itself until there is an error
*/
async function executeListener(queueManager, queueName, subject) {
try {
const mqObjectDefinition = new mq.MQOD();
mqObjectDefinition.ObjectName = queueName;
mqObjectDefinition.ObjectType = MQC.MQOT_Q;
const listenerOptions = getProperty('IBMMQ.listenerOpenOptions') || MQC.MQOO_INPUT_SHARED;
// const queueObject = await mqOpen(queueManager, mqObjectDefinition, listenerOptions);
// manually handle the promise
const queueObject = await new Promise((resolve, reject) => {
mq.Open(queueManager, mqObjectDefinition, listenerOptions, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
const messageDetails = new mq.MQMD();
const getMessageOptions = new mq.MQGMO();
messageDetails.Persistence = MQC.MQPER_PERSISTENT // Enable message persistence
getMessageOptions.WaitInterval = MQC.MQWI_UNLIMITED;
getMessageOptions.MatchOptions = MQC.MQMO_NONE;
getMessageOptions.Options = MQC.MQGMO_NO_SYNCPOINT
| MQC.MQGMO_WAIT
| MQC.MQGMO_CONVERT
| MQC.MQGMO_FAIL_IF_QUIESCING;
const messageBuffer = await mqGet(queueObject, messageDetails, getMessageOptions);
logger.info(`Received message: ${messageBuffer}`);
subject.next(messageBuffer.toString());
logger.info(`queueManager status: ${queueManager}`)
// Close the queue
await new Promise((resolve, reject) => {
mq.Close(queueObject, 0, function (err) {
if (err) {
logger.error(`Received error when close queue ${err} while fetching message`)
reject(err)
} else {
logger.info("close the queue after emitted the message while fetching message")
resolve()
}
}
);
})
// close the connection
await new Promise((resolve, reject) => {
mq.Disc(queueManager, 0, function (err) {
if (err) {
logger.error(`Received error when close connection ${err} while fetching message`)
reject(err)
} else {
logger.info("close the connection after emitted the message while fetching message")
resolve()
}
}
);
})
return executeListener(queueManager, queueName, subject);
} catch (err) {
logger.error(`Received error ${err}`);
throw err;
}
}
/**
*
* @param {MQQueueManager} queueManager
* @returns {function} listener function
*/
function listenerClosure(queueManager) {
const listenerSubject = new Subject();
/**
*
* @param {string} queueName name of queue to listen to
* @return {EventEmitter} data emitter for queue
*/
return function listener(queueName) {
executeListener(queueManager, queueName, listenerSubject);
return listenerSubject.asObservable();
};
}
module.exports = listenerClosure;
In client.js
a connection is created and stored as mqQueueManager
.
const mqQueueManager = await queueManager(mqPassword);
This is then shared between listener and publisher.
listener: listenerClosure(mqQueueManager),
publisher: publisherClosure(mqQueueManager),
executeListener
Opens a connection Gets a single message Closes the connection Disconnects
The disconnect will disrupt all modules that are sharing the connection. IE. Disc closes the connection created with Connx.
In
client.js
a connection is created and stored asmqQueueManager
.const mqQueueManager = await queueManager(mqPassword);
This is then shared between listener and publisher.
listener: listenerClosure(mqQueueManager), publisher: publisherClosure(mqQueueManager),
executeListener
Opens a connection Gets a single message Closes the connection Disconnects
The disconnect will disrupt all modules that are sharing the connection. IE. Disc closes the connection created with Connx.
@chughts Yeah, I made a change of specifying different variable
const ListenerQueueManager = await queueManager(mqPassword);
const publisherQueueMnager = await queueManager(mqPassword);
return {
listener: listenerClosure(ListenerQueueManager),
publisher: publisherClosure(publisherQueueMnager),
};
It worked at the first time, but after I successfully published one message and close both queue and connection, I'm getting another error when I try to publish the second message, seems like the queueManager never be called to establish the connection again before I call the mq.open method.
Error:
MQError: OPEN: MQCC = MQCC_FAILED [2] MQRC = MQRC_HCONN_ERROR [2018]
I re-built my queueManage.js with some logging as this:
try {
const connection = await new Promise((resolve, reject)=>{
mq.Connx(getProperty('IBMMQ.queueManager'), connectionOptions, function(err, conn){
if(err) {
logger.error("MQ call failed in " + err.message)
reject(err)
} else{
logger.info(`connection status: ${JSON.stringify(conn)}`)
resolve(conn)
}
})
})
return connection
} catch (error) {
logger.error(`Error occurs while connecting MQ: ${error}`)
return null
}
I didn't see the connection status logs when I do my second publish.
Still the same publisher.js
function publisherClosure(queueManager) {
/**
*
* @param {string} queueName queue manager object
* @param {string|object} data data to publish
* @returns {null} no return value
*/
return async function publisher(queueName, data) {
if (queueManager == null) {
logger.error(`failed to publish message in queue ${queueName} as queue manager is null`);
return; //return if connection manager is not available
}
let queueObject;
try {
const mqObjectDefinition = new mq.MQOD();
mqObjectDefinition.ObjectName = queueName;
mqObjectDefinition.ObjectType = MQC.MQOT_Q;
const putOptions = getProperty('IBMMQ.putOpenOptions') || MQC.MQOO_OUTPUT;
queueObject = await mqOpen(queueManager, mqObjectDefinition, putOptions);
const messageDetails = new mq.MQMD();
const putMessageOptions = new mq.MQPMO();
messageDetails.Format = MQC.MQFMT_STRING;
messageDetails.Persistence = MQC.MQPER_PERSISTENT // Enable message persistence
putMessageOptions.Options = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_NEW_MSG_ID
| MQC.MQPMO_NEW_CORREL_ID;
let dataBuffer;
if (typeof data === 'object') dataBuffer = Buffer.from(JSON.stringify(data));
else dataBuffer = Buffer.from(data);
let publishMsgStatus = " ";
try {
await mqPut(queueObject, messageDetails, putMessageOptions, dataBuffer);
logger.info(`Published Message ${data} to queue ${queueName}`);
publishMsgStatus = "successfully published";
} catch (err) {
logger.error(`Error Publishing Message ${data} to queue ${queueName} with error ${err}`);
publishMsgStatus = "message publishing failed";
}
return publishMsgStatus;
} catch (err) {
logger.error(`Error publishing message to Queue ${queueName}`, err);
throw err;
} finally {
await cleanUp(queueManager, queueObject);
}
};
}
async function cleanUp(connectionObject, queueObject) {
if (queueObject) {
await mqClose(queueObject, 0); //closing the queue connection
}
logger.info(`hconn status: ${JSON.stringify(connectionObject)}`)
mqDisc(connectionObject).then(() => {
logger.info(`hconn status: ${JSON.stringify(connectionObject)}`)
logger.info(`successfully closed connection to channel :- ${getProperty('IBMMQ.channelName')}`)
}).catch(() => {
logger.error(`failed to close connection to channel :- ${getProperty('IBMMQ.channelName')}`)
}); //closing the channel connection
}
My API to call this publisher:
async function postMessage(req, res) {
try {
const client = await clientPromise;
const message = req.body.msg;
const queue = req.body.queueName || getProperty('IBMMQ.publishQueueName');
logger.info(`Publishing Message ${message} to ${queue}`);
const queueResponse = await client.publisher(queue, message, {
noAuth: false,
noCSRF: false,
rejectUnauthorized: true,
});
const successResponse = {
success: true,
queueResponse,
};
res.json(CircularJSON.stringify(successResponse));
logger.info('send response to Aircom successfully');
} catch (err) {
logger.error('error publishing mq', err);
const failResponse = {
success: false,
err,
};
res.status(400).json(CircularJSON.stringify(failResponse));
}
}
The logic
const ListenerQueueManager = await queueManager(mqPassword);
const publisherQueueMnager = await queueManager(mqPassword);
Will create 2 connections for the duration of the application. So your logic is
Create 2 connections
Loop
Open for publisher
put
Close for publisher
Disconnect publisher connection
Open for listener
get
Close for listener
Disconnect listener connection
Second round the loop the connection will have been disconnected, and is not reconnected. You need to move the disconnects out of the loop.
Create 2 connections
Loop
Open for publisher
put
Close for publisher
Open for listener
get
Close for listener
Disconnect both connections
BTW opening / closing the connection for an individual put or get is an anti-pattern. You should open each connection for as long as there are messages to put / get and then close it. Creating and disconnecting for an individual put or get is worse still.
The logic
const ListenerQueueManager = await queueManager(mqPassword); const publisherQueueMnager = await queueManager(mqPassword);
Will create 2 connections for the duration of the application. So your logic is
Create 2 connections Loop Open for publisher put Close for publisher Disconnect publisher connection Open for listener get Close for listener Disconnect listener connection
Second round the loop the connection will have been disconnected, and is not reconnected. You need to move the disconnects out of the loop.
Create 2 connections Loop Open for publisher put Close for publisher Open for listener get Close for listener Disconnect both connections
BTW opening / closing the connection for an individual put or get is an anti-pattern. You should open each connection for as long as there are messages to put / get and then close it. Creating and disconnecting for an individual put or get is worse still.
Yeah, I re-built my code based on your suggestions and it works, thank you so much.
function publisherClosure(queueManager) {
return async function publisher(queueName, data) { if (queueManager == null) { logger.error(
failed to publish message in queue ${queueName} as queue manager is null
); return; //return if connection manager is not available } let queueObject; try { const mqObjectDefinition = new mq.MQOD(); mqObjectDefinition.ObjectName = queueName; mqObjectDefinition.ObjectType = MQC.MQOT_Q; const putOptions = getProperty('IBMMQ.putOpenOptions') || MQC.MQOO_OUTPUT; queueObject = await mqOpen(queueManager, mqObjectDefinition, putOptions);}; }
const logger = getConsoleLogger(); // keep these variables in private scope module.exports = async function queueManager(mqPassword) { const mqc = mq.MQC; const connectionOptions = new mq.MQCNO(); const connectionDetails = new mq.MQCD(); const connectionSecurity = new mq.MQCSP();
connectionDetails.ConnectionName = getProperty('IBMMQ.connectionName'); connectionDetails.ChannelName = getProperty('IBMMQ.channelName');
if (mqPassword || getProperty('IBMMQ.password')) { connectionSecurity.UserId = getProperty('IBMMQ.userId'); connectionSecurity.Password = mqPassword || getProperty('IBMMQ.password'); }
connectionOptions.SecurityParms = connectionSecurity; connectionOptions.Options = mqc.MQCNO_CLIENT_BINDING; connectionOptions.ClientConn = connectionDetails; const connection = promisify(mq.Connx); try { const connectionCreation = await connection(getProperty('IBMMQ.queueManager'), connectionOptions); return connectionCreation; } catch (e) { logger.error('error creating connection manager', e); return null; } };
const logger = getConsoleLogger(); async function createMQClient(mqPassword = null) { const mqQueueManager = await queueManager(mqPassword);
logger.info(
Connected to QueueManager ${getProperty('IBMMQ.queueManager')} on ${getProperty('IBMMQ.connectionName')} on channel ${getProperty('IBMMQ.channelName')}
); return { listener: listenerClosure(mqQueueManager), publisher: publisherClosure(mqQueueManager), }; }module.exports = createMQClient;
Published Message hconn status: {\"_hConn\":2113929,\"_name\":\"\"}" hconn status: {\"_hConn\":-1,\"_name\":\"***\"}"} successfully closed connection to channel Received error MQError: GET: MQCC = MQCC_FAILED [2] MQRC = MQRC_HCONN_ERROR [2018] UnhandledPromiseRejectionWarning: MQError: GET: MQCC = MQCC_FAILED [2] MQRC = MQRC_HCONN_ERROR [2018] /app/build/node_modules/ibmmq/lib/mqi.js:2214:18 /app/build/node_modules/ffi-napi/lib/_foreign_function.js:115:9