ibm-messaging / mq-mqi-nodejs

Calling IBM MQ from Node.js - a JavaScript MQI wrapper
Apache License 2.0
79 stars 42 forks source link

Using SYNCPOINT with the ibmmq v2 NodeJS client #169

Open anvad opened 1 year ago

anvad commented 1 year ago

mq-mqi-nodejs version - 2.0.1

I am trying to use this package to get messages using MQGMO_SYNCPOINT flag in the gmo (get message options) object.

const mq = require('ibmmq')

gmo.Options =
  MQC.MQGMO_SYNCPOINT |
  MQC.MQGMO_WAIT |
  MQC.MQGMO_CONVERT |
  MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.MatchOptions = MQC.MQMO_NONE;
gmo.WaitInterval = this.waitInterval * 1000; // 60 seconds during testing
mq.Get(this.queueHandle, new mq.MQMD(), gmo, this.getMessagesCB);

The MQGMO_SYNCPOINT option ensures that the message is not removed from the queue until I later commit the message using mq.Cmit call.

However, when I make that Cmit call in my callback function (this.getMessagesCB) e.g.

getMessagesCB(error, hObj, gmo, md, buf, hConn) {
  mq.Cmit(this.connectionHandle, function (err) {
    if (err) {
      console.log("Error on commit", err);
    }
  });
}

I get the following error:

{
  "name": "MQError",
  "message": "CMIT: MQCC = MQCC_FAILED [2] MQRC = MQRC_HCONN_ASYNC_ACTIVE [2500]",
  "mqcc": 2,
  "mqccstr": "MQCC_FAILED",
  "mqrc": 2500,
  "mqrcstr": "MQRC_HCONN_ASYNC_ACTIVE",
  "version": "2.0.1",
  "verb": "CMIT"
}

With this new version of 'ibmmq' NodeJS package, we have to call the Ctl method with one of the start options. I am using MQOP_START as shown in the samples included in the v2 version. e.g.

mq.Ctl(this.connectionHandle, mq.MQC.MQOP_START, function (error) {
  if (error) {
    console.log(`error sending MQOP_START control call: ${error}`)
  }
})

Reading the control callbacks documentation says:

MQOP_START Start the consuming of messages for all defined message consumer functions for the specified connection handle.

Callbacks run on a thread started by the system, which is different from any of the application threads.

This operation gives control of the provided connection handle to system. The only MQI calls which can be issued by a thread other than the consumer thread are:

  • MQCTL with Operation MQOP_STOP
  • MQCTL with Operation MQOP_SUSPEND
  • MQDISC - Performs MQCTL with Operation MQOP_STOP before disconnection the HConn.
  • MQRC_HCONN_ASYNC_ACTIVE is returned if an IBM® MQ API call is issued while the connection handle is started, and the call does not originate from a message consumer function.

So, I tried, issuing a new Ctl call with MQOP_STOP, before calling mq.Cmit e.g.

mq.Ctl(this.connectionHandle, mq.MQC.MQOP_STOP, function (error) {
  if (error) {
    logError({ msg: `mq.CTL STOP error: ${formatErr(error)}`, error });
  } else {
    logInfo(`initiated MQOP_STOP`);
    mq.Cmit(this.connectionHandle, this.callbackOnCommit);
  }
});

Now I get a different error:

libc++abi: terminating due to uncaught exception of type Napi::Error

I suspect, this is because I have no way of accessing the correct thread from within NodeJS, to issue the Cmit command.

Could you please provide a working example of using MQGMO_SYNCPOINT to get a message non-destructively from a queue and then later commit the message using Cmit?

ibmmqmet commented 1 year ago

It's taken a little while to reproduce this, partly because the behaviour seems to be different depending on whether you're connecting as a client or using local bindings (which is my default config). I wouldn't have expected it to work differently, and I think the local bindings may be "wrong", but that's something else to deal with later.

Anyway, the fundamental reason for getting the MQRC_HCONN_ASYNC_ACTIVE is that the NodeJS callback function is not running on the same thread as the real C-driven callback function. So you can't call MQI functions directly from that NodeJS callback.

But I was able to get a program to work with syncpoint/commit calls.

For clarity, in the callback function I call cmitFromCB instead of the mq.Cmit() function. And that new function is:

function cmitFromCB() {
  mq.Ctl(connectionHandle, MQC.MQOP_SUSPEND, function (err) {
    console.log(formatErr(err));
  });
  mq.Cmit(connectionHandle, function (err) {
    if (err) {
      console.log("Error on commit", err);
    } else {
      console.log("Commit was OK");
    }
  });  
  mq.Ctl(connectionHandle, MQC.MQOP_RESUME, function (err) {
    console.log(formatErr(err));
  });
}

Note that it's using MQOP SUSPEND/RESUME, not STOP/START.

anvad commented 1 year ago

@ibmmqmet Thanks for posting a solution! I'll try this later today. Just looking at the code though, I have a few questions: In the cmitFromCB() function, we are calling mq.Cmit outside the callback from mq.Ctl SUSPEND call. So, aren't we running the risk of calling mq.Cmit before mq.Ctl completes? Similarly, isn't it possible the 2nd call to mq.Ctl is made before the call to mq.Cmit completes?

Also, could we run into cascading failures of mq.Cmit and the second mq.Ctl if the first mq.Ctl callback is called with error?

I think what I am asking is, why are we not nesting the subsequent mq calls within the previous call's callback function? One possible reason, I suspect, is this might put us in a different underlying C thread(s) between the various calls to mq.*. But if the mq calls are not synchronous, then I expect to run into the risk of making the next call while the first call is still in progress.

ibmmqmet commented 1 year ago

Ctl and Cmit (and Back) are effectively synchronous calls to the C libraries so nesting in callback functions is not really necessary. The only true async calls in this package are the ones where there's also a *Sync equivalent function (eg Open and OpenSync).

I did think about making Cmit/Back asynchronous but decided the sync approach is much more reliable here - or at least, more likely to get any errors back to the application. Which is something that's key when doing transactional work

The MQI C libraries have a number of constraints around threads, primarily to ensure you don't try to have 2 operations going on simultaneously for the same hConn. That was relaxed slightly when the callback capability was implemented, but it still imposes restrictions which don't play very nicely with the Node async/thread model. Which is why the application-level callback here is on a different (the main Node execution) thread than the MQI called.

anvad commented 1 year ago

Yes, that's what I discovered in my experiments yesterday. But this leaves me vulnerable to getting blocked for long periods (when making calls to Ctl and Cmit) if the network connection to the queue manager is slow. I tested this yesterday by simulating a slow connection (500 Kbs with 200ms delay added) and the calls were taking upwards of a second to complete, during which time no other JS code could execute.

Perhaps these "synchronous" calls could be made truly async where the call to Ctl returns a thread id that has to be passed in as an additional parameter to the subsequent calls to Cmit and Ctl so that the underlying C library could queue up the work to execute on the same thread and the JS app does not have to block.

Also, if they are effectively synchronous, would it make sense to have an alternate API for these calls, where they simply return the error rather than taking a callback? e.g.

/** returns err if there is any failure, else returns undefined **/
function cmitFromCB() {
  let err = mq.Ctl(connectionHandle, MQC.MQOP_SUSPEND);
  if (err) {
    console.log("Error on SUSPEND", err);
    return err;
  }
  err = mq.Cmit(connectionHandle);
  if (err) {
    console.log("Error on commit", err);
    return err;
  } else {
    console.log("Commit was OK");
  }  
  err = mq.Ctl(connectionHandle, MQC.MQOP_RESUME);
  if (err) {
    console.log("Error on RESUME", err);
    return err;
  }
}

I guess I can create this cleaner API in my own code. e.g.

function myCmit(connecrtionHandle) {
  let error;
  mq.Cmit(connectionHandle, function (err) {
    error = err;
  });
  return error;
}

let err = myCmit(connectionHandle);
ibmmqmet commented 1 year ago

I really don't like the idea of making CMIT asynchronous as you might otherwise "start" a new transaction before the previous one has been committed and it ends up being part of the older transcion when the commit finally happens. The sequencing MIGHT turn out to be OK, but this way is much more definitive. I don't want to be subject to the vagaries of the node internal thread scheduling. We also do not have any direct access to the MQI thread that's doing the real callback - it's under the control of the MQI C library. So can't add work for it to process.

If you don't want to use a callback on the sync calls like Cmit, then they already throw exceptions on error rather than giving a return code when no callback function is provided.

anvad commented 1 year ago

I don't see a way to avoid async. For example, in the getMessageCB function, I want to write the received message to the database. This is an async call. While this async call is executing, the getMessagesCB callback could be called again because a new message has arrived. e.g.

getMessagesCB = async (error, hObj, gmo, md, buf, hConn) {
  const order = JSON.parse(buf.toString('utf8'))

  // while I await the write to DB to finish, my `getMessagsesCB` function returns a promise
  // since the function returns, the MQ client calls `getMessagsesCB` again immediately with the next message waiting in the queue
  await this.ordersRepo.insert({
    orderId: order.Id,
    orderDate: (new Date()).toISOString(),
    orderDetails: order.Details,
  })

  // so later, when `this.cmitFromCB` is called, it is possible that two messages get committed in MQ, whereas only one of them was successfully written to the DB at that time
  this.cmitFromCB()
}

So, using this approach, I could not avoid the issue you mentioned ("starting" a new transaction before the previous one is committed).

To avoid this, I had to modify the getMessagsesCB function, to call GetDone first, to prevent the MQ client from trying to grab the next message. And then, in the cmitFromCB function, I had to re-register getMessagsesCB to receive the next message.

e.g.

getMessagesCB = async (error, hObj, gmo, md, buf, hConn) {
  // de-register the callback, so MQ client does not send more messages
  mq.GetDone(this.queueHandle, function (err) {
    logInfo({ msg: `GetDone remove message listener. Err=${err}`, error: err })
  })

  const order = JSON.parse(buf.toString('utf8'))
  // now even though getMessagesCB returns on the next line (with a promise), 
  //  MQ client will not call getMessagesCB again with a new message
  await this.ordersRepo.insert({
    orderId: order.Id,
    orderDate: (new Date()).toISOString(),
    orderDetails: order.Details,
  })

  this.cmitFromCB()
}

// update this.cmitFromCB to re-register getMessagsesCB
cmitFromCB() {
  mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND, function (err) {
     console.log('Error in Ctl MQOP_SUSPEND', err)
  })
  mq.Cmit(this.hConn, function (err) {
    console.log('Error in Cmit', err)
  })

  // re-register listener for messages
  mq.Get(this.queueHandle, this.md, this.gmo, this.getMessagesCB)

  mq.Ctl(this.hConn, mq.MQC.MQOP_RESUME, function (err) {
     console.log('Error in Ctl MQOP_RESUME', err)
  })
}

Perhaps there is a more elegant way to tell the MQ client not to send the next message till the previous message is either committed (Cmit call) or rolled back (Back call). I could not find it.

And since all these Ctl, GetDone, Cmit calls can take a while and are blocking, my current (less than elegant) solution is to move the entire MQ message reading module to a worker_thread so it allows the rest of the app to remain responsive to web requests etc.

And Mark, thank you so much for patiently engaging and explaining the library. It has helped clear up many of my doubts and I think I am much closer now to a robust solution that is both "safe" and non-blocking.

ibmmqmet commented 1 year ago

I do have a better solution to forcing you to do the SUSPEND/RESUME - when I went back to the code, I realised that I had already coded it. And it worked on the machine/container that I had originally tested it on. But there was a timing assumption that broke it on every other machine I tried more recently. An easy fix to that will go into the next release, which will be around the time of the next MQ CD update. With stuff added to the README to clarify.

Basically, I do the SUSPEND/RESUME at suitable times around the callback to the message listener. It still means the MQCTL calls are being made, but everything inside the callback is "reliable" including CMIT only processing the messages you expect it to. But those calls are still going to be synchronous.

anvad commented 1 year ago

I tried to use SUSPEND/RESUME earlier in the process but I was still getting the 2nd message before I was done with the first.

Here's my code:

// register the new message callback in the class `init` function
init() {
  try {
      const cno = new mq.MQCNO();
      cno.Options = mq.MQC.MQCNO_CLIENT_BINDING; // connecting as client
      const cd = new mq.MQCD();
      cd.ConnectionName = mqConfig.connectionName;
      cd.ChannelName = mqConfig.channelName;
      cno.ClientConn = cd;

      this.hConn = await mq.ConnxPromise(qMgr, cno);
      const od = new mq.MQOD();
      od.ObjectName = qName;
      od.ObjectType = mq.MQC.MQOT_Q;

      let openOptions = mq.MQC.MQOO_INPUT_SHARED | mq.MQC.MQGMO_SYNCPOINT;
      this.queueHandle = await mq.OpenPromise(this.hConn, od, openOptions);

      let md = new mq.MQMD(); // message descriptor?
      const gmo = new mq.MQGMO(); // get message options
      const MQC = mq.MQC;
      gmo.Options = MQC.MQGMO_SYNCPOINT | MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT | MQC.MQGMO_FAIL_IF_QUIESCING;
      gmo.WaitInterval = MQC.MQWI_UNLIMITED; // wait forever
      gmo.MatchOptions = MQC.MQMO_NONE;

      this.md = md;
      this.gmo = gmo;

      mq.Get(this.queueHandle, md, gmo, this.getMessagesCB);
  } catch(error) {
    console.error('Error in registering MQ callback', error);
  }
}

// getMessagesCB is a class method that was set as the callback in a call to `mq.Get()`
getMessagesCB = async (error, hObj, gmo, md, buf, hConn) => {
  if (error) {
    console.error('error in getMessages callback', error);
    return;
  }
  try {
    mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND);

    // do my async thing with this message
    await writeMessageToDB(buf.toString('utf8'));

    // now I need to call suspend again since the await keyword used above puts me in a continuation, else I get a MQRC_HCONN_ASYNC_ACTIVE error
    // so, calling suspend, Cmit and resume in the same synchronous region of code
    mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND);
    mq.Cmit(this.hConn);
    mq.Ctl(this.hConn, mq.MQC.MQOP_RESUME);
  } catch(error) {
    console.error('error saving message to DB and committing message in MQ', error);
  }
}

To avoid getting the 2nd message before the first message was committed, I had to change from SUSPEND/RESUME to STOP/START. This way, I was able to avoid calling mq.GetDone() and later mq.Get()

However, I still need to run this entire code using node worker_threads to avoid blocking non-MQ related code in my application.

Really, there should be a way to tell the C library that after the first message is received (by the C-library), before the C-library calls the JS callback, it should call mqctl SUSPEND (or STOP). Similarly, when we call Cmit, there should be a way to tell the C-library to do all three things (stop, Cmit, start) in the same call.

This way, the JS API calls into the MQ library need not be synchronous. Synchronous network calls in an event loop driven system such as NodeJS which depends on co-operative multi-tasking does not seem like the correct pattern.