ibm-messaging / mq-mqi-nodejs

Calling IBM MQ from Node.js - a JavaScript MQI wrapper
Apache License 2.0
79 stars 41 forks source link

NestJS - mq.Get() callback not being executed #178

Open stephanyes opened 6 months ago

stephanyes commented 6 months ago

Please include the following information in your ticket.

ibmmq: 2.0.2 nestjs: 8.2.8

Its the first time im opening a github issue so hopefully you can help me out.

At work we are trying to use this package and add it to our own libraries but I'm not sure why I cant make it work with NestJS.

We managed to reproduce the sample on how to subscribe to a topic and listen to messages with a .js script and works like a charm. The problem is when we generate a package with a module, a service and everything we need to have the same with Nestjs.

The ibmmq.module.ts is a normal module that has a useFactory that starts our ConnectionService so far so good. Our connection.service.ts has an onModuleInit that creates the connection to our broker and other methods like onApplicationShutdown. Again so far so good here.

Then the magic happends in our ibmmq.ts were we basically copy the script I described above to retrieve messages from a topic, almost the same example provided in this repository. Once all modules in our application are loaded included the IBM (without any errors) the behaviour of getting messages its not working as intended.

Specifically the callback the mq.Get() we are sending it never gets executed and we don't understand why.

I can provide how are code looks but I don't think anybody tried to do this in nest so far (at least I went through the issues page and I don't find another example with nestjs)

Can you guide me with my problem? Again ill provide any code neccesary so you can see it. This is our ibmmq.ts:

import { Logger } from '@nestjs/common';
import * as mq from 'ibmmq';
import { MQC, MQCD, MQCNO, MQCSP, MQMD, MQGMO, MQSD, MQQueueManager } from 'ibmmq';
import { IConnection, IConnectionConfig } from './connection.interface';
import { Subject } from 'rxjs';
const StringDecoder = require('string_decoder').StringDecoder;
const decoder = new StringDecoder('utf8');
process.env['MQIJS_NOUSECTL'] = 'true';

export class Ibmmq {
  private connObj = { hConn: null, hObj: null };
  private isExiting = false;
  private readonly qMgr: string;
  private topic: string;
  private readonly connectionConfig: {
    userId: string;
    password: string;
    connectionName: string;
    channelName: string;
    applName: string;
  };

  private readonly logger: Logger;
  private msgId: string | null = null;
  private connectionHandle: mq.MQQueueManager | null;
  private topicHandle: mq.MQObject | null;
  private ok = true;
  private exitCode = 0;
  private subject = new Subject<string>();
  constructor(config: IConnectionConfig) {
    this.qMgr = config.qManager;
    this.topic = config.topic;
    this.connectionConfig = {
      userId: config.userId,
      password: config.password,
      connectionName: config.connectionName,
      channelName: config.channelName,
      applName: config.applName,
    };
    this.logger = new Logger(config.channelName);
  }

  private formatErr(err: mq.MQError) {
    if (err) {
      this.ok = false;
      this.logger.error(`MQ call failed at ${err.message}`);
    } else {
      this.logger.log(`MQ call successful`);
    }
  }

  getMessages(hObj: mq.MQQueueManager | mq.MQObject) {
    if (this.isExiting) {
      return;
    }
    let md = new MQMD();
    let gmo = new MQGMO();
    gmo.Options =   MQC.MQGMO_NO_SYNCPOINT | MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT | MQC.MQGMO_FAIL_IF_QUIESCING;
    gmo.WaitInterval = 3 * 1000; // 3 seconds
    console.log('AAAAAAAAAAAAAAAAAAAA');
    // mq.Get(hObj, md, gmo, (err, hObj, gmo, md, buf) => {
    //     console.log("this is executed?")
    // }); // This is not being executed for some reason

    mq.Get(hObj, md, gmo, (err, hObj, gmo, md, buf) => this.getCB(err, hObj, gmo, md, buf));
  }

   private getCB(err: mq.MQError, hObj: mq.MQObject, gmo: mq.MQGMO, md: mq.MQMD, buf: Buffer) {
    console.log('BBBBBBBBBBBBBBBBBBB');
    if (err) {
      if (err.mqrc == MQC.MQRC_NO_MSG_AVAILABLE) {
        this.logger.log('No more messages available.', 'getCB()');
        // Wait for 5 seconds before trying to get messages again
        setTimeout(() => this.getMessages(hObj), 5000);
      } else {
        this.formatErr(err);
        this.isExiting = true;
        mq.GetDone(hObj);
      }
    } else {
      this.logger.log(`Message:  ${buf.toString()}`);
      this.getMessages(hObj); // recursively get next message
    }
  }
  private cleanup(connObj: { hConn: any; hObj: any }) {
    if (connObj.hObj) {
      mq.Close(connObj.hObj, 0, function (err) {
        if (err) {
          this.formatErr(err);
        } else {
          this.logger.log('MQCLOSE successful', 'cleanup() MQ Close');
          connObj.hObj = null; // set hObj to null to indicate that the queue is closed
        }
        if (connObj.hConn) {
          mq.Disc(connObj.hConn, function (err) {
            if (err) {
              this.formatErr(err);
            } else {
              this.logger.log('MQDISC successful', 'cleanup() MQ Disconnect');
              connObj.hConn = null; // set hConn to null to indicate that the connection is closed
            }
          });
        }
      });
    }
  }

   async disconnect(): Promise<void> {
    this.isExiting = true;
    this.cleanup(this.connObj);
  }

   async connect(): Promise<MQQueueManager> {
    return new Promise((resolve, reject) => {
      const cd = new MQCD();
      cd.ConnectionName = this.connectionConfig.connectionName;
      cd.ChannelName = this.connectionConfig.channelName;
      const csp = new MQCSP();
      csp.UserId = this.connectionConfig.userId;
      csp.Password = this.connectionConfig.password;

      const cno = new MQCNO();
      if (typeof cno.Options === 'number') {
        cno.Options |= MQC.MQCNO_CLIENT_BINDING;
      } else {
        // Handle the case where cno.Options is an array of MQC_MQCNO
        // Depending on your use case, you might want to throw an error, log a message, etc.
        console.error('cno.Options is not a number');
      }
      cno.ClientConn = cd;
      cno.SecurityParms = csp;
      if (MQC.MQCNO_CURRENT_VERSION >= 7) {
        cno.ApplName = 'Node.js 9.1.2 ApplName';
      }

      mq.Connx(this.qMgr, cno, (err: any, conn: mq.MQQueueManager) => {
        if (err) {
          this.logger.error(`Error connecting to queue manager ${this.qMgr}`);
          this.logger.error(this.formatErr(err));
          reject(err);
        } else {
          this.logger.log(`Connected to queue manager ${this.qMgr}`);
          this.connectionHandle = conn;
          this.connObj.hConn = conn;
          let topicString = this.topic;
          let sd = new mq.MQSD();
          sd.ObjectString = topicString;
          sd.Options = MQC.MQSO_CREATE | MQC.MQSO_NON_DURABLE | MQC.MQSO_MANAGED;
          mq.Sub(this.connObj.hConn, null, sd, (err, sub, hObjSubscription) => {
            if (err) {
              this.formatErr(err);
            } else {
              this.logger.log(`MQSUB to ${topicString} successful`, 'MQ Sub');
              if (sub) {
                this.getMessages(sub);
              } else {
                this.logger.log('_hObj is undefined or null');
              }
              resolve(sub);
            }
          });
          //   resolve(conn);
        }
      });
    });
  }
   public getObservable() {
    return this.subject.asObservable();
  }
  public getExitCode(): number {
    return this.exitCode;
  }
  public getMsgId(): string {
    return this.msgId;
  }
}

Im open to any kind of suggestions and sorry if it looks messy, thanks in advance.

ibmmqmet commented 6 months ago

I don';t know anythnig about NestJS, or if there are specific requirements in there. I'd suggest taking a trace of the app (MQIJS_TRACE=true) so get a better idea of the flow. As a standalone program, this seems to work.

chughts commented 6 months ago

I suspect that when your code is nested inside the Nest.JS class that the environment variables are not visible. In lieu of setting

process.env['MQIJS_NOUSECTL'] = 'true';

before the ibmmq module is loaded, you may need to set

mq.setTuningParameters({useCtl:false});

in the constructor, or maybe even closer to the get.

chughts commented 6 months ago

Just took a closer look at your code. I think you need to rearrange your imports to set the environment variable before loading ibmmq. ie.

process.env['MQIJS_NOUSECTL'] = 'true';

import * as mq from 'ibmmq';
import { MQC, MQCD, MQCNO, MQCSP, MQMD, MQGMO, MQSD, MQQueueManager } from 'ibmmq';
...