team-supercharge / nest-amqp

Nest AMQP 1.0 module - NPM package: https://www.npmjs.com/package/@team-supercharge/nest-amqp
https://supercharge.io
MIT License
26 stars 13 forks source link

Can't use asynchronous queue module options #73

Open amangeot opened 9 months ago

amangeot commented 9 months ago

Hello @raschan, I'm trying to test url connection for handling failover urls before setting up connection uri in queue module options.

To do that I'm trying to use a useFactory with QueueModule@3.5.1 defined as useFactory?: (...args: any[]) => Promise<QueueModuleOptions> | QueueModuleOptions;.

Looking at logs, it shows the factory being called twice: once synchronously, once asynchronously. For my use case, the synchronous call makes all connection tests fail, hence the app won't start.

Any chance you can remove the extra synchronous call?

Thanks again,


Here is some code if you'd like to reproduce using a Nest's ConfigService with a .env and a single broker instance.

When canConnect returns false by default, the app won't start. When canConnect returns true by default, the async config makes the app start, the sync config fails, and it triggers a reconnection. You can see that behaviour within logs.

import { InternalServerErrorException, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { QueueModule, QueueModuleOptions } from '@team-supercharge/nest-amqp';
import { ConnectionDetails, ConnectionOptions } from 'rhea';
import { Connection } from 'rhea-promise';

export async function getQueueModuleFactory(
  configService: ConfigService<Record<string, any>, true>,
): Promise<QueueModuleOptions> {
  // Prepare logger and log levels.
  const logger = new Logger(QueueModule.name);
  if (!logger?.localInstance?.setLogLevels) throw new InternalServerErrorException('Logger not available');
  logger.localInstance.setLogLevels(['verbose', 'error', 'warn', 'log', 'debug']);

  const connectionOptions: ConnectionOptions = {
    password: configService.get<string>('MESSAGE_BROKER_PASSWORD'),
    rejectUnauthorized: configService.get<boolean>('MESSAGE_BROKER_REJECT_UNAUTHORIZED'),
    username: configService.get<string>('MESSAGE_BROKER_USERNAME'),
  };

  const urls = configService.get<string>('MESSAGE_BROKER_URLS').split(',');

  logger.debug(`Getting queue module options for: ${urls}`);

  // Retrieve first valid url.
  let url: string | null = null;
  for (const testUrl of urls) {
    logger.debug(`About to check connection: ${testUrl}`);

    const can = await canConnect(testUrl, { connectionOptions, logger });
    if (can) {
      url = testUrl;
      break;
    }
  }

  logger.debug(`url: ${url}`);

  // Don't start the app if we can't connect to broker.
  if (!url) throw new InternalServerErrorException('Could not connect to message broker.');

  // Register reconnection attempts.
  // NOTE: Reconnection attempts are not reset after a successful reconnection.
  let attempts = 0;

  return {
    connectionUri: url,
    connectionOptions: {
      ...connectionOptions,
      // Switch between urls.
      // NOTE: "connection_details" (rhea) takes over "connectionUri" when reconnecting.
      connection_details: function (): ConnectionDetails {
        attempts++;
        url = urls[attempts % urls.length];
        const { protocol, hostname: host, port } = new URL(url);

        logger.verbose(`Lost connection, retrying on: ${protocol}//${host}:${port}. (${attempts})`);

        return {
          port: Number.parseInt(port),
          host,
          transport: getTransport(protocol),
          options: connectionOptions,
        };
      },
      reconnect: true,
    },
    logger,
    throwExceptionOnConnectionError: false,
  };
}

export function getTransport(protocol: string) {
  switch (protocol) {
    case 'amqp:':
      return 'tcp';
    case 'amqps:':
      return 'ssl';
    case 'amqp+ssl:':
      return 'ssl';
    case 'amqp+tls:':
      return 'tls';
    default:
      return 'tcp';
  }
}

export async function canConnect(
  url: string,
  { connectionOptions, logger }: { connectionOptions: ConnectionOptions; logger?: Logger },
): Promise<boolean> {
  logger?.debug(`Checking connection: ${url}`);

  const { protocol, hostname: host, port } = new URL(url);

  const connection = new Connection({
    ...connectionOptions,
    host: host,
    port: Number.parseInt(port),
    transport: getTransport(protocol),
    reconnect: false,
  });

  try {
    await connection.open();

    logger?.debug(`Tried opening: ${url}`);
  } catch (error) {
    if (logger) logger.verbose(`Could not connect to ${protocol}//${host}:${port}. Error: ${error instanceof Error ? error.message : JSON.stringify(error)}.`); // prettier-ignore

    return false;
  }

  if (connection.isOpen()) {
    logger?.debug(`Opened: ${url}`);

    await connection.close();

    return true;
  }

  logger?.debug(`Did not open: ${url}`);
  return false; // NOTE: The app starts when returning true by default.
}

Used as:

QueueModule.forRootAsync({
      useFactory: getQueueModuleFactory,
      inject: [ConfigService],
})