bbc / sqs-consumer

Build Amazon Simple Queue Service (SQS) based applications without the boilerplate
https://bbc.github.io/sqs-consumer/
Other
1.74k stars 333 forks source link

handleMessage does not allow for an injected process method #403

Closed ltyiz07 closed 1 year ago

ltyiz07 commented 1 year ago

Describe the bug

Assign a call-back function at "handleMessage" at "Consumer.create(options: ConsumerOptions)" with "ConsumerOptions.handleMessage" direct method assign is not working properly.

When I call injected services's method at inside of "process" method that I set as "handleMessage", it is not called properly. Just pending everything without any error or messages. But, If I call injected service's method at lambda function assigned to "handleMessge", it works properly.

Not working below code.

import { Message, SQSClient } from '@aws-sdk/client-sqs';
import { Inject, Injectable } from '@nestjs/common';
import { Consumer } from 'sqs-consumer';
import { Producer } from 'sqs-producer';
import { IJobConsumer } from './interfaces/job-queue.consumer.interface';
import { IJobQueueModuleOptions, JOB_QUEUE_OPTIONS } from './interfaces/job-queue.options.interface';
import { IJobQueueService } from './interfaces/job-queue.service.interface';

@Injectable()
export class JobQueueService implements IJobQueueService {

    private readonly sqsClient: SQSClient;
    private producer: Producer;
    private consumer: Consumer;

    constructor(
        @Inject(JOB_QUEUE_OPTIONS) private readonly options: IJobQueueModuleOptions,
    ) {

        this.sqsClient = new SQSClient({
            region: this.options.queueConnectionConfigs.sqsRegion,
            credentials: {
                accessKeyId: this.options.queueConnectionConfigs.sqsAccessKeyId,
                secretAccessKey: this.options.queueConnectionConfigs.sqsSecretAccessKey,
            },
        });

        this.producer = Producer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
        });

    }

    async produce(messages: any): Promise<any> {
        console.log('send message: ', messages);
        const result = await this.producer.send(messages);
        return result;
    }

    setConsumer(consumer: IJobConsumer, options?: any): void {
        this.consumer = Consumer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
            visibilityTimeout: 30,
            heartbeatInterval: 10,
            // handleMessage: async (msg: Message): Promise<Message | void>  => {
                // return await consumer.process(msg);
            // },
            handleMessage: consumer.process,
        })
        return this.consumer.start();
    }

    getJobQueueName(): string {
        return this.options.jobQueueName;
    }
}

but below code is working as expected

import { Message, SQSClient } from '@aws-sdk/client-sqs';
import { Inject, Injectable } from '@nestjs/common';
import { Consumer } from 'sqs-consumer';
import { Producer } from 'sqs-producer';
import { IJobConsumer } from './interfaces/job-queue.consumer.interface';
import { IJobQueueModuleOptions, JOB_QUEUE_OPTIONS } from './interfaces/job-queue.options.interface';
import { IJobQueueService } from './interfaces/job-queue.service.interface';

@Injectable()
export class JobQueueService implements IJobQueueService {

    private readonly sqsClient: SQSClient;
    private producer: Producer;
    private consumer: Consumer;

    constructor(
        @Inject(JOB_QUEUE_OPTIONS) private readonly options: IJobQueueModuleOptions,
    ) {

        this.sqsClient = new SQSClient({
            region: this.options.queueConnectionConfigs.sqsRegion,
            credentials: {
                accessKeyId: this.options.queueConnectionConfigs.sqsAccessKeyId,
                secretAccessKey: this.options.queueConnectionConfigs.sqsSecretAccessKey,
            },
        });

        this.producer = Producer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
        });

    }

    async produce(messages: any): Promise<any> {
        console.log('send message: ', messages);
        const result = await this.producer.send(messages);
        return result;
    }

    setConsumer(consumer: IJobConsumer, options?: any): void {
        this.consumer = Consumer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
            visibilityTimeout: 30,
            heartbeatInterval: 10,
            handleMessage: async (msg: Message): Promise<Message | void>  => {
                return await consumer.process(msg);
            },
            // handleMessage: consumer.process,
        })
        return this.consumer.start();
    }

    getJobQueueName(): string {
        return this.options.jobQueueName;
    }
}

Your minimal, reproducible example

import { Message, SQSClient } from '@aws-sdk/client-sqs'; import { Inject, Injectable } from '@nestjs/common'; import { Consumer } from 'sqs-consumer'; import { Producer } from 'sqs-producer'; import { IJobConsumer } from './interfaces/job-queue.consumer.interface'; import { IJobQueueModuleOptions, JOB_QUEUE_OPTIONS } from './interfaces/job-queue.options.interface'; import { IJobQueueService } from './interfaces/job-queue.service.interface'; @Injectable() export class JobQueueService implements IJobQueueService { private readonly sqsClient: SQSClient; private producer: Producer; private consumer: Consumer; constructor( @Inject(JOB_QUEUE_OPTIONS) private readonly options: IJobQueueModuleOptions, ) { this.sqsClient = new SQSClient({ region: this.options.queueConnectionConfigs.sqsRegion, credentials: { accessKeyId: this.options.queueConnectionConfigs.sqsAccessKeyId, secretAccessKey: this.options.queueConnectionConfigs.sqsSecretAccessKey, }, }); this.producer = Producer.create({ queueUrl: this.options.queueConnectionConfigs.url, sqs: this.sqsClient, }); } async produce(messages: any): Promise { console.log('send message: ', messages); const result = await this.producer.send(messages); return result; } setConsumer(consumer: IJobConsumer, options?: any): void { this.consumer = Consumer.create({ queueUrl: this.options.queueConnectionConfigs.url, sqs: this.sqsClient, visibilityTimeout: 30, heartbeatInterval: 10, handleMessage: async (msg: Message): Promise<Message | void> => { return await consumer.process(msg); }, // handleMessage: consumer.process, }) return this.consumer.start(); } getJobQueueName(): string { return this.options.jobQueueName; } }

Steps to reproduce

  1. Import job-queue module with its options
  2. Using job-queue modules method "setConsumer" set consumer class that implements IJobConsumer
  3. At Job-queue module's "setConsumer" method, set handleMessage with consumer's process method

Expected behavior

As a user, I expect injected service's method works properly even if I assign it to "handleMessage" directly.

How often does this bug happen?

Every time

Screenshots or Videos

No response

Platform

Package version

7.1.0

AWS SDK version

3.332.0

Additional context

No response

nicholasgriffintn commented 1 year ago

This definitely isn't a bug and is more a difference between your implementation and how we expect it to be implemented, the second example is the solution we have built for.

Will remove the bug label as this is not a bug. Happy to take a PR if you'd like this capability.

nicholasgriffintn commented 1 year ago

This has been pre released to v7.2.0-canary.3, will be released to v7.2.0, once I've had a chance to validate all of the changes.

github-actions[bot] commented 1 year ago

This issue has been closed for more than 30 days. If this issue is still occuring, please open a new issue with more recent context.