ibm-messaging / mq-mqi-nodejs

Calling IBM MQ from Node.js - a JavaScript MQI wrapper
Apache License 2.0
79 stars 41 forks source link

Need to understand how to set sync point with asynchronous Get #137

Closed creasman closed 2 years ago

creasman commented 2 years ago

mq-mqi-nodejs version: 0.9.17

I'm trying to understand how to use the asynchronous version of the Get method with a sync point to utilize MQ transactions. The goal is to receive a single message, then either commit or rollback this message before receiving the next one.

I have this working with the synchronous GetSync call. However, I read in the comments that use of this method is not recommended in cases where you want to wait a while for a message to arrive.

I tried coding the Get call like is done throughout the examples in this set, adding MQGMO_SYNCPOINT to the GMO options. The code looks something similar to this:

function getMessages() {
    let md = new mq.MQMD();
    let gmo = new mq.MQGMO();

    gmo.Options = MQC.MQGMO_SYNCPOINT |
        MQC.MQGMO_WAIT |
        MQC.MQGMO_CONVERT |
        MQC.MQGMO_FAIL_IF_QUIESCING;
    gmo.WaitInterval = waitInterval * 1000; // 3 seconds

    mq.setTuningParameters({
        getLoopPollTimeMs: 500
    });
    mq.Get(queueHandle, md, gmo, getCB);
}

The getCB callback is where a message is handled, and either committed or rolled back.

function getCB(err, hObj, gmo, md, buf, hConn) {
    if (err) {
        if (err.mqrc == MQC.MQRC_NO_MSG_AVAILABLE) {
            console.log("No more messages available.");
        } else {
            console.log(formatErr(err));
            exitCode = 1;
        }
        ok = false;
        mq.GetDone(hObj);
    } else {
        // Make a copy of the message buffer

        // Process the message (e.g., write to DB, invoke an API, etc.)  Point is this step involves promises, waits, etc., 
        // and might take a few seconds to complete.

        // Based on above decide to commit or roll back on the transaction.
    }
}

The problem I find is that the sync point is associated with the mq.Get call and not tied to a particular message. This means that if there are five messages on the queue at the time I invoke mq.Get then it is very likely my getCB callback will be called five times before the first message is committed or rolled back. Likewise, more messages may arrive while the first is being processed. This has the effect of performing the mq.Cmit or mq.Back action on a block of messages rather than an individual message (which is the desired behavior).

Am I missing something in my understanding? I like the fact that mq.Get is asynchronous and non-blocking, but I need to make sure that my code is committing or rolling back only a single message at a time. Adding a transaction example that uses mq.Cmit and mq.Back to the sample set would greatly help.

Thanks, Jim

ibmmqmet commented 2 years ago

I'd suggest calling GetDone before you head off into processing the message. That should stop any more messages being delivered. Then, when you're ready to receive another message, call Cmit and then Get with your getCB function which will resume processing.

I was never particularly happy with the GetDone name but couldn't think of something better at the time and it's why I don't get to officially name things in MQ.

The Cmit call is fundamentally synchronous in this implementation which should make it easier to deal with all the callback nesting.

I don't have time right now to create a full sample though I'll keep it in mind for when I do have an opportunity. Though of course I'm happy to accept PRs from anyone else who feels like submitting an example.

creasman commented 2 years ago

Thanks for the quick reply. I've been trying to make this work using GetDone as you suggest but it's still not working. Here's what I currently have in the code.

Main loop:

    let msgObj;
    while (msgObj = await getMessage(hObj)) {
        console.log('sleeping...')
        await sleep(2000); // Sleep for 2 seconds.

        console.log('handling message...')
        await handleMessage(msgObj);
    }

For getMessage I have:

async function getMessage(queueHandle) {
    console.log(`\n==> getMessage called...`);
    return new Promise((resolve) => {
        let md = new mq.MQMD();
        let gmo = new mq.MQGMO();

        gmo.Options = MQC.MQGMO_SYNCPOINT |
            MQC.MQGMO_WAIT |
            MQC.MQGMO_CONVERT |
            MQC.MQGMO_FAIL_IF_QUIESCING;
        gmo.MatchOptions = MQC.MQMO_NONE;
        gmo.WaitInterval = waitInterval * 1000; // milliseconds

        console.log(`\nCalling mq.Get...`);
        mq.Get(queueHandle, md, gmo, (err, hObj, gmo, md, buf, hConn) => {
            console.log(`\nGet callback: ${formatErr(err)}`);

            // Prevent more messages from arriving while we're processing this one.
            try {
                console.log(`Calling GetDone...`);
                mq.GetDone(hObj);
            } catch (err) {
                console.log(`GetDone error: ${formatErr(err)}`);
                console.log(formatErr(err));
            }

            if (err) {
                console.log(`Get Callback error: ${formatErr(err)}`);
                if (err.mqrc == MQC.MQRC_NO_MSG_AVAILABLE) {
                    console.log("No more messages available.");
                } else {
                    console.log(formatErr(err));
                    exitCode = 1;
                }

                resolve(null);
            } else {
                let msg = (md.Format == "MQSTR" ? decoder.write(buf) : buf);
                console.log(`Get Callback msg: ${msg}`);
                resolve({
                    md: md,
                    msg: msg
                });
            }
        });
    });
}

Finally, the handleMessage alternates between committing and rolling back the message like this:

let commit = false;

async function handleMessage(msgObj) {
    console.log(`Transacting message: ${msgObj.msg}`);
    return new Promise(async resolve => {
        commit = !commit;
        if (commit) {
            mq.Cmit(connectionHandle, function (err) {
                if (err) {
                    console.log(`Failed to commit transaction: ${formatErr(err)}`);
                } else {
                    console.log(`Message committed`);
                }
                resolve(msgObj)
            })
        } else {
            mq.Back(connectionHandle, function (err) {
                if (err) {
                    console.log(`Failed to roll back transaction: ${formatErr(err)}`);
                } else {
                    console.log(`Message rolled back`);
                }
                resolve(msgObj)
            })
        }
    });
}

Calling GetDone doesn't really change anything from what I can tell. Any messages on the queue at the time the underlying polling interval releases are pulled and the get callback is invoked, once per message. The commit or roll back continues to affect the entire block of messages, not just the first one.

I did upgrade to the latest client level (0.9.21) and verified I hit the same problem. Do you see anything incorrect in the above logic?

creasman commented 2 years ago

As an update we were able to make this work using the synchronous version (GetSync) by setting MQGMO_NO_WAIT in the GMO options, and then creating our own polling loop. This returns a message whenever one becomes available and doesn't block the execution.

            while (this.running) {
                let msgObj = await this.getMessage();
                if (msgObj) {
                    await this.handleMessage(msgObj);
                    continue;
                }
                await new Promise(resolve => setTimeout(resolve, interval));
            }
    private async getMessage(): Promise<Message> {
        const mqMD = new mq.MQMD();
        const self = this;
        let buf = Buffer.alloc(1024);

        return new Promise(resolve => {
            mq.GetSync(self.queueHandle, mqMD, self.mqGMO, buf, (err: any, len: any) => {
                if (err) {
                    if (err.mqrc === 2019) {
                        // Normal close event
                    } else if (err.mqrc === 2033) {
                        // Normal no message event
                    } else {
                        // Handle error event
                        mq.GetDone(self.queueHandle, (err) => {
                            // Log error
                        });
                        self.stop();
                    }
                    resolve(null);
                } else {
                    const result: Message = new Message();
                    result.backoutCount = (mqMD.BackoutCount ? mqMD.BackoutCount : 0);
                    result.body = decoder.write(buf.slice(0, len));
                    resolve(result);
                }
            });
        })
    }

Closing this issue.