ssut / nestjs-sqs

A project to make SQS easier to use within NestJS, with decorator-based handling and seamless NestJS-way integration.
MIT License
204 stars 54 forks source link

Not able to connect to LocalStack Sqs URL #58

Closed VivekPNs closed 1 year ago

VivekPNs commented 1 year ago

Hi Folks,

I have been struggling to figure out why Im not able to connect to LocalStack sqs URL. I have setup LocalStack to run the queue in my local. I was able to send and receive the message using cli commands and also I was able to publish the message to this queue from my other non nestjs application but the ssut/nestjs-sqs throws the below error and I'm not sure why.


api/node_modules/sqs-consumer/dist/errors.js:40
    const sqsError = new SQSError(message);
                     ^
SQSError: SQS receive message failed: The address https://sqs.us-east-1.amazonaws.com/ is not valid for this endpoint.
    at toSQSError (/api/node_modules/sqs-consumer/dist/errors.js:40:22)
    at Consumer.receiveMessage (/api/node_modules/sqs-consumer/dist/consumer.js:173:43)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
agnarok commented 1 year ago

Im facing the same issue the only solution i found was to instantiate my own SQS Client

const sqsClient = new SQSClient({
     region,
     endpoint: 'LOCALSTACK_URL'
}) ;
VivekPNs commented 1 year ago

Im facing the same issue the only solution i found was to instantiate my own SQS Client

const sqsClient = new SQSClient({
     region,
     endpoint: 'LOCALSTACK_URL'
}) ;

@agnarok Could you please let me know where exactly you instantiate the client and consume it? I have own service file and I'm consuming the message like below

Module file

import { Module } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { SqsModule } from '@ssut/nestjs-sqs';

@Module({
  imports: [
    ProposalsModule,
    SqsModule.registerAsync({
      useFactory: (configService: ConfigService) => {
        return {
          consumers: [
            {
              name: configService.getOrThrow('AWS_SQS_QUEUE_NAME'),
              queueUrl: 'http://localhost:4566/000000000000/queueName',
              region: configService.getOrThrow('AWS_REGION'),
            },
          ],
          producers: [],
        };
      },
      inject: [ConfigService],
    }),
  ],
  providers: [QueueHandler],
})

Service file:


import { Injectable, Logger } from '@nestjs/common';
import { SqsConsumerEventHandler, SqsMessageHandler } from '@ssut/nestjs-sqs';
import { SQS } from 'aws-sdk';

@Injectable()
export class QueueHandler {
  constructor(private pService: PService) {}
  @SqsMessageHandler(queueName, false)
  async handleMessage(message: SQS.Message) {
    const logger = new Logger(this.handleMessage.name);
    try {
          //Some Logic
    } catch (error) {
      logger.error(error.message, error.stack);
      throw error;
    }
  }
agnarok commented 1 year ago

SQSClient is an option for the consumer so your module should be something like this :arrow_down:

import { Module } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { SqsModule } from '@ssut/nestjs-sqs';

@Module({
  imports: [
    ProposalsModule,
    SqsModule.registerAsync({
      useFactory: (configService: ConfigService) => {
        return {
          consumers: [
            {
              name: configService.getOrThrow('AWS_SQS_QUEUE_NAME'),
              queueUrl: 'http://localhost:4566/000000000000/queueName',
              sqs = new SQSClient({
                  region: 'REGION',
                  endpoint: 'LOCALSTACK_URL'
               }) ;            
             },
          ],
          producers: [],
        };
      },
      inject: [ConfigService],
    }),
  ],
  providers: [QueueHandler],
}) 
VivekPNs commented 1 year ago

@agnarok I'm really confused now. SQSClient is not part of this library so, not sure how do I instantiate it. Also is queueUrl and endpoint are same?

agnarok commented 1 year ago

@VivekPNs SqSClient is used internally in the lib, so you would need to import in your project to use it like i suggested.

Also, no endpoint and the queueUrl are not the same. The endpoint is used in the AWS SDK for pooling data

VivekPNs commented 1 year ago

@agnarok But SqSClient is not part of the @ssut/nestjs-sqs library. Could you provide some snippets to understand better?

horan-dev commented 1 year ago

Finally, it works for me this is a full code for ConsumerModule don't forget to import ConsumerModule in the app module

import { ConfigModule, ConfigService } from '@nestjs/config';
import { Module } from '@nestjs/common';
import { SqsModule } from '@ssut/nestjs-sqs';
import { ConsumerService } from './consumer.service';
import { SQSClient } from '@aws-sdk/client-sqs';

@Module({
  imports: [
    ConfigModule,
    SqsModule.registerAsync({
      imports: [ConfigModule], // Import the ConfigModule to use the ConfigService
      useFactory: async (configService: ConfigService) => {
        const accessKeyId = configService.get<string>('sqs.accessKeyId');
        const secretAccessKey = configService.get<string>(
          'sqs.secretAccessKey',
        );

        // Retrieve the required configuration values using ConfigService
        return {
          consumers: [
            {
              name: configService.get<string>('sqs.queue_name'), // name of the queue
              queueUrl: configService.get<string>('sqs.url'), // url of the queue
              region: configService.get<string>('sqs.region'), // using the same region for the producer
              batchSize: 10, // number of messages to receive at once
              terminateGracefully: true, // gracefully shutdown when SIGINT/SIGTERM is received
              sqs:new SQSClient({
                region:configService.get<string>('sqs.region'),
                credentials: {
                  accessKeyId: accessKeyId,
                  secretAccessKey:secretAccessKey
                }
             })
            },
          ],
          producers: [],
        };
      },
      inject: [ConfigService],
    }),
  ],
  controllers: [],
  providers: [ConsumerService],
  exports: [ConsumerService],
})
export class ConsumerModule {}
horan-dev commented 1 year ago

@VivekPNs

VivekPNs commented 1 year ago

Yup, I did the same way and it works. Thank you.

rbonestell commented 4 months ago

I apologize for cracking open an old issue, but I'm unable to find any more information on what I've found here. This issue and one other article both have this attribute in the consumer options: terminateGracefully: true, // gracefully shutdown when SIGINT/SIGTERM is received

Where did this come from and what did it do? I simply want to ensure that my NestJS app completes processing the message it's currently handling before terminating on a SIGINT or SIGTERM from ECS, might this help?

Cheers and thanks in advance!