ssut / nestjs-sqs

A project to make SQS easier to use within NestJS, with decorator-based handling and seamless NestJS-way integration.
MIT License
214 stars 53 forks source link

Problem with dynamic module #73

Closed warubert closed 10 months ago

warubert commented 10 months ago

Hello! I am trying to use a dynamic module to consume and produce message for 5 sqs queues Here is my module:

import { SqsModule } from '@ssut/nestjs-sqs';
import * as AWS from 'aws-sdk';
import { SQSClient } from "@aws-sdk/client-sqs";
import { QueueHandler } from './queue-handler';
import { ConfigModule } from '../config/config.module';
import { ConfigService } from '../config/config.service';
import { forwardRef, DynamicModule, Module } from '@nestjs/common';
import { DevicesModule } from '../devices/devices.module';
import { SqsMessageProducer } from './sqs-message-producer.service';

@Module({
  imports: [ConfigModule, forwardRef(() => DevicesModule)],
  providers: [QueueHandler, SqsMessageProducer],
  exports: [QueueHandler, SqsMessageProducer],
})
export class SqsQueueModule {
  static registerQueue(queue: { queueName: string }): DynamicModule {
    console.log(`Creating module for queue: ${queue.queueName}`);

    const dynamicModule = {
      module: SqsQueueModule,
      imports: [
        SqsModule.registerAsync({
          useFactory: (configService: ConfigService) => {
            const queueConfig = {
              name: queue.queueName,
              consumers: [
                {
                  name: queue.queueName,
                  queueUrl: configService.get('SQS_QUEUE_URL') + queue.queueName,
                  region: configService.get('SQS_AWS_REGION'),
                  sqs: new SQSClient({
                    region: configService.get('SQS_AWS_REGION'),
                    credentials: {
                      accessKeyId: configService.get('SQS_AWS_ACCESS_KEY_ID'),
                      secretAccessKey: configService.get('SQS_AWS_SECRET_ACCESS_KEY')
                    }
                  })
                },                
              ],
              producers: [
                {
                  name: queue.queueName,
                  queueUrl: configService.get('SQS_QUEUE_URL') + queue.queueName,
                  region: configService.get('SQS_AWS_REGION'),
                  sqs: new SQSClient({
                    region: configService.get('SQS_AWS_REGION'),
                    credentials: {
                      accessKeyId: configService.get('SQS_AWS_ACCESS_KEY_ID'),
                      secretAccessKey: configService.get('SQS_AWS_SECRET_ACCESS_KEY')
                    }
                  })
                },   
              ],
            };

            console.log(`Queue Configuration for ${queue.queueName}:`, queueConfig);

            return queueConfig;
          },
          inject: [ConfigService],
        }),
      ],
    };

    console.log(`Module created for queue: ${queue.queueName}`);

    return dynamicModule;
  }
}

and Im registering it like this:

    SqsQueueModule.registerQueue({ queueName: DEVICE_DATA_QUEUE }),
    SqsQueueModule.registerQueue({ queueName: TESTE_QUEUE }),
    SqsQueueModule.registerQueue({ queueName: RESET_DEVICE_CALIBRATION_QUEUE }),
    SqsQueueModule.registerQueue({ queueName: UPDATE_DEVICE_CALIBRATION_QUEUE }),
    SqsQueueModule.registerQueue({ queueName: DEVICE_LOG_WIPE_QUEUE }),

What happens is that only the first queue registered this way works and the others doesnt work at all (but somehow all console.logs are show when creating all queues). Someone knows how I could fix that?

warubert commented 10 months ago

Solved, i was implementing the dynamic module in a wrong way, here is the right way of doing that:

import { SqsModule } from '@ssut/nestjs-sqs';
import { QueueHandler } from './queue-handler';
import { SQSClient } from '@aws-sdk/client-sqs';
import { ConfigModule } from '../config/config.module';
import { ConfigService } from '../config/config.service';
import { DevicesModule } from '../devices/devices.module';
import { forwardRef, DynamicModule, Module } from '@nestjs/common';
import { SqsMessageProducer } from './sqs-message-producer.service';

@Module({
  imports: [ConfigModule, forwardRef(() => DevicesModule)],
  providers: [QueueHandler, SqsMessageProducer],
  exports: [QueueHandler, SqsMessageProducer],
})
export class SqsQueueModule {
  static register({ queueNames }: { queueNames: string[] }): DynamicModule {
    return SqsModule.registerAsync({
      useFactory: function(configService: ConfigService) {
        const sqs = new SQSClient({
          region: configService.get('SQS_AWS_REGION'),
          credentials: {
            accessKeyId: configService.get('SQS_AWS_ACCESS_KEY_ID'),
            secretAccessKey: configService.get('SQS_AWS_SECRET_ACCESS_KEY'),
          },
        })

        let consumers = [];
        let producers = [];
        const sqsQueueUrl = configService.get('SQS_QUEUE_URL');
        const sqsRegion = configService.get('SQS_AWS_REGION');
        for (const queueName of queueNames) {
          const queueData = {
            name: queueName,
            queueUrl: sqsQueueUrl + queueName,
            region: sqsRegion,
            sqs
          };

          consumers.push(queueData);
          producers.push(queueData);
        }

        return { consumers, producers };
      },
      inject: [ConfigService],
    });
  }
}

registering:

    SqsQueueModule.register(
      {
        queueNames: [
          DEVICE_DATA_QUEUE,
          TESTE_QUEUE,
          RESET_DEVICE_CALIBRATION_QUEUE,
          UPDATE_DEVICE_CALIBRATION_QUEUE,
          DEVICE_LOG_WIPE_QUEUE
        ]
      }),