ibm-messaging / mq-mqi-nodejs

Calling IBM MQ from Node.js - a JavaScript MQI wrapper
Apache License 2.0
79 stars 42 forks source link

Receiving and sending segmented messages by IBM MQ #175

Closed florianmarkusse closed 9 months ago

florianmarkusse commented 9 months ago

I am trying to consume messages from an IBM MQ that I suspect is sending chunked messages. I am trying to find out if the producer is chunking the messages manually before sending it to IBM MQ or that it is being chunked by IBM MQ themselves.

Now the payload I am receiving contains the following metadata related to groups of the MQMD:

{
  "GroupId": {
    "type": "Buffer",
    "data": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  },
  "MsgSeqNumber": 1,
  "Offset": 0,
  "MsgFlags": 0,
  "OriginalLength": -1
}

The code that does the consuming of the messages:

const md = new mq.MQMD();
    const gmo = new mq.MQGMO();

    // https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.ref.dev.doc/q096710_.htm
    gmo.Options =
      MQC.MQGMO_SYNCPOINT | // https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.0.0/com.ibm.mq.ref.dev.doc/q096780_.html
      // MQC.MQGMO_NO_SYNCPOINT |
      MQC.MQGMO_NO_WAIT | // Should NOT wait if doing a Synchronous Get as it's a "blocking" call
      // MQC.MQGMO_WAIT |
      // Accept truncated messsages that don't fit in the Buffer with a Warning
      // vs with an Error. If omitted, you cannot Commit this message.
      MQC.MQGMO_ACCEPT_TRUNCATED_MSG |
      MQC.MQGMO_CONVERT |
      MQC.MQGMO_FAIL_IF_QUIESCING |
      MQC.MQGMO_COMPLETE_MSG;

    gmo.MatchOptions = MQC.MQMO_NONE;

...

    mq.GetSync(this.hObj, md, gmo, this.buffer, async (err, len) => {
      ...
    }

As you can see, there does not seem to be any MsgFlags set, am I right to assume that this implies that the producer is chunking the messages manually or may there be something wrong in the configuration for receiving the messages?

I tried to create grouped messages locally by setting the options as per https://www.ibm.com/docs/en/ibm-mq/9.3?topic=segmentation-reassembly-by-queue-manager . However, I am unable to get this to work, it seems that MQMD.Version is not exposed in the node bindings.

Very basic example:

async function sendMessageToIBMMessageQueue(queueName, message) {
  const hConn = await connectToIBMMessageQueueManager();
  const hObj = await connectToIBMMessageQueue(hConn, queueName, "output");

  return new Promise((resolve, reject) => {
    const mqmd = new MQ.MQMD();
    const pmo = new MQ.MQPMO();
    /* eslint-disable no-bitwise */
    pmo.Options =
      MQC.MQPMO_NO_SYNCPOINT | MQC.MQPMO_NEW_MSG_ID | MQC.MQPMO_NEW_CORREL_ID;
    /* eslint-enable no-bitwise */
    mqmd.MsgFlags = MQC.MQMF_SEGMENTATION_ALLOWED;
    // Would be setting Version here but this is not present on mqmd.

    MQ.Put(hObj, mqmd, pmo, message, err => {
      if (err) {
        reject(err);
      }

      resolve();
    });
  });
}

and then send a mesage to ibm mq that is larger than the set MaxMsgLength returns in a MQRC_DATA_LENGTH_ERROR

So in short, my question is twofold:

I hope you find time to answer this question and thanks for your time in advance!

Versions tried: 1.0.1 and 2.0.2.

ibmmqmet commented 9 months ago

I am guessing that you are trying to control the message length based on the MAXMSGL on the SVRCONN channel rather than on a queue. If I leave the SVRCONN value at 4MB but put the queue maxlength at 20, then segmentation works as expected. The MsgFlags value is respected and reported - your application code looks fine for dealing with it. Or if I use local bindings, with no channel involved, then it too works ok.

But if I put the SVRCONN maxlength to 20, then I get the MQRC_DATA_LENGTH_ERROR.

Basically, automatic segmentation is handled inside the queue manager rather than the application. So when a client is involved, the long message is rejected before it gets a chance to be segmented. Nothing that this package can realistically do about it; it's the behaviour of the underlying client layer.

florianmarkusse commented 9 months ago

I think I am unable to get it to work. I understand that in the channel definition I need to not set the MAXMSGL but I am not sure where I can set the queue settings. I cannot find the definitions for the queue. Sorry to dump this code on you but I think it shows it better:

    it("test message segmentation", async() => {
      const IBM_MQ_CONNECTION_NAME = "ibm-mq-server-dev(1414)";
      const IBM_MQ_QUEUE_MANAGER_NAME = "QGC1";
      const IBM_MQ_CHANNEL_NAME = "DEV.APP.SVRCONN";
      const messageQueueName = "ACS.BRK.2809CHQ.OUTBOUND";
      const messageToSend = "a".repeat(1000);

      const createConnection = (msgLength) => {
        return new Promise((resolve, reject) => {
          const cno = new MQ.MQCNO();

          /* eslint-disable-next-line no-bitwise */
          cno.Options |= MQC.MQCNO_CLIENT_BINDING;

          const cd = new MQ.MQCD();
          cd.MaxMsgLength = msgLength;
          cd.ConnectionName = IBM_MQ_CONNECTION_NAME;
          cd.ChannelName = IBM_MQ_CHANNEL_NAME;
          cno.ClientConn = cd;

          MQ.Connx(IBM_MQ_QUEUE_MANAGER_NAME, cno, (err, hConn) => {
            if (err) {
              reject(err);
            }

            resolve(hConn);
          });
        })
      };

      const openConnection = (createdConnection, isInput) => {
        return new Promise((resolve, reject) => {
          const od = new MQ.MQOD();
          od.ObjectName = messageQueueName;
          /* eslint-disable no-bitwise */
          const openOptions = isInput
            ? MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING
            : MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING
          /* eslint-enable no-bitwise */

          MQ.Open(createdConnection, od, openOptions, (err, hObj) => {
            if (err) {
              reject(err);
            }

            resolve(hObj);
          });
        })
      };

      const createdConnection = await createConnection(1000);
      const handleObject = await openConnection(createdConnection, false);

      // Puts message on queue.
      await new Promise((resolve, reject) => {
        const mqmd = new MQ.MQMD();
        const pmo = new MQ.MQPMO();
        /* eslint-disable no-bitwise */
        pmo.Options =
          MQC.MQPMO_NO_SYNCPOINT | MQC.MQPMO_NEW_MSG_ID | MQC.MQPMO_NEW_CORREL_ID;
        /* eslint-enable no-bitwise */
        mqmd.MsgFlags = MQC.MQMF_SEGMENTATION_ALLOWED;

        MQ.Put(handleObject, mqmd, pmo, messageToSend, err => {
          if (err) {
            reject(err);
          }

          resolve();
        });
      });

      const createdGettingConnection = await createConnection(1000);
      const createdGettingObject = await openConnection(createdGettingConnection, true);

      // Gets message from queue.
      const receivedMessage = await new Promise((resolve, reject) => {
        const md = new MQ.MQMD();
        const gmo = new MQ.MQGMO();
        /* eslint-disable no-bitwise */
        gmo.Options =
          MQC.MQGMO_SYNCPOINT | // https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.0.0/com.ibm.mq.ref.dev.doc/q096780_.html
          // MQC.MQGMO_NO_SYNCPOINT |
          MQC.MQGMO_NO_WAIT | // Should NOT wait if doing a Synchronous Get as it"s a "blocking" call
          // MQC.MQGMO_WAIT |
          // Accept truncated messsages that don"t fit in the Buffer with a Warning
          // vs with an Error. If omitted, you cannot Commit this message.
          MQC.MQGMO_ACCEPT_TRUNCATED_MSG |
          MQC.MQGMO_CONVERT |
          MQC.MQGMO_FAIL_IF_QUIESCING |
          MQC.MQGMO_COMPLETE_MSG;
        /* eslint-enable no-bitwise */

        gmo.MatchOptions = MQC.MQMO_NONE;
        const buffer = Buffer.alloc(2000);

        MQ.GetSync(createdGettingObject, md, gmo, buffer, (err, len) => {
          if (err) {
            if (err.mqrc !== MQC.MQRC_TRUNCATED_MSG_ACCEPTED) {
              reject(err);
            }
          }

          console.log("md", md)

          resolve(buffer);
        });
      });

      console.log("receivedMessage: ", receivedMessage)
    }

So when I reduce

      const createdGettingConnection = await createConnection(1000);

to

      const createdGettingConnection = await createConnection(800);

or any number below 1000, I am getting:

MQError: GET: MQCC = MQCC_FAILED [2] MQRC = MQRC_DATA_LENGTH_ERROR [2010]

However, I do not know where else I can set the MaxMsgLength to have the messages segmented by ibmq mq. Am I missing a structure somehow?

ibmmqmet commented 9 months ago

A queue's maxmsglength (as is a channel's value on the SVRCONN end) is set administratively. It's not done from a regular application program. For example, in runmqsc: ALTER QL(QUEUENAME) MAXMSGL(1000)

If you don't have access, you will need to find your MQ administrator.

florianmarkusse commented 9 months ago

I see, we don't control the actual queue and just consume from it. Thanks for your help and I will reach out to an MQ admin!