golevelup / nestjs

A collection of badass modules and utilities to help you level up your NestJS applications 🚀
MIT License
2.31k stars 270 forks source link

Having trouble consuming messages via subscription (nestjs-rabbitmq) #294

Closed collinwu closed 3 years ago

collinwu commented 3 years ago

Hello, I'm having trouble leveraging @golevelup/nestjs-rabbitmq package and I'm not sure what I'm missing.

I've been following the docs wiring up the message queue to a NestJS application, but I'm publishing messages via the RabbitMQ admin client, but not getting any messages showing up via subscription handler.

Screenshots below:

Messaging Service / Module

Screen Shot 2021-07-30 at 12 00 05 PM Screen Shot 2021-07-30 at 12 06 49 PM

App bootstrap

Screen Shot 2021-07-30 at 11 58 09 AM

Publishing a message, but subscription handler is not invoking

Screen Shot 2021-07-30 at 12 13 13 PM
stvnwrgs commented 3 years ago

I'm facing a similar issue. This is really strange because it worked a while ago and it is still working inside another module. I also recreated the rabbitmq instance multiple times.

import { Inject, Injectable, Logger } from '@nestjs/common';

import { Nack, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { UserEventService } from './user-event-service';
import * as exchanges from '../../rabbitqueue/exchanges';
import { IEmitterService } from '../../emitter/emitters/IEmitterService';
import { IKVCache } from '../../cache/kvCache.interface';
import { IUserQueueEvent } from './IUserQueueEvent';
import { IRabbitQueueMessage } from '../../rabbitqueue/IRabbitQueueMessage';

@Injectable()
export class UserService {
  private logger: Logger;

  constructor(
    // @Inject('RedisCacheService') private userCache: IKVCache,
    ...
    @Inject('UserEventService')
    private userEventService: UserEventService,
    @Inject('EmitterService') private emitterService: IEmitterService,
  ) {
    this.logger = new Logger(UserService.name);
  }

  @RabbitSubscribe({
    exchange: exchanges.exchangeName,
    routingKey: exchanges.EVENTS.GH_USER_FOUND,
    queue: 'gh.user.identity',
  })
  public async listenToGithubUsers(msg: IRabbitQueueMessage<IUserQueueEvent>) {
    console.log('Give logzzzz');
    this.logger.log(`Getting identity ifno message`);
    try {
      // check if already in cache
      this.logger.log(`Getting identity info for user ${msg.data.username}`);
      // const cacheKey =
      //   'ghi' + this.userCache.createCacheKeyFromString(msg.data.id);
      // const inCache = await this.userCache.get(cacheKey);
      // if (inCache) {
      //   this.logger.log(
      //     `User ${msg.data.username} already scraped, skipping...`,
      //   );
      //   return;
      // }

      await this.emitUserEventsForUser(msg.data.username);
      // await this.userCache.set(cacheKey, 1);
    } catch (err) {
      this.logger.error(err);
      return new Nack(true);
    }
  }
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { exchanges } from './exchanges';
import { RabbitQueueService } from './rabbitqueue.service';

@Module({
  imports: [
    RabbitMQModule.forRootAsync(RabbitMQModule, {
      imports: [ConfigModule],
      useFactory: async () => {
        return {
          exchanges: exchanges,
          uri: process.env.WORKER_RABBITMQ_CONNECTION,
          connectionInitOptions: { wait: true },
          // must be 1
          prefetchCount: 1,
        };
      },
      inject: [ConfigService],
    }),
  ],
  providers: [RabbitQueueService],
  exports: [RabbitQueueService],
})
export class RabbitQueueModule {}
Running on port:  3002
[Nest] 54488   - 03.08.2021, 21:52:25   [RabbitMQModule] Initializing RabbitMQ Handlers
[Nest] 54488   - 03.08.2021, 21:52:25   [RabbitMQModule] Registering rabbitmq handlers from UserService
[Nest] 54488   - 03.08.2021, 21:52:25   [RabbitMQModule] UserService.listenToGithubUsers {subscribe} -> worker::gh.user.found::gh.user.identity
[Nest] 54488   - 03.08.2021, 21:52:25   [AmqpConnection] Successfully connected a RabbitMQ channel
[Nest] 54488   - 03.08.2021, 21:52:25   [NestApplication] Nest application successfully started +11ms

image

"dependencies": {
    "@apollo/client": "^3.3.11",
    "@golevelup/nestjs-rabbitmq": "^1.17.1",
    "@nestjs/common": "^7.6.13",
    "@nestjs/config": "^0.6.3",
    "@nestjs/core": "^7.6.13",
    "@nestjs/platform-express": "^7.6.13",
    "@sevenfifteen/nestjs-rabbitmq": "^1.0.5",
    "@types/lodash": "^4.14.168",
    "apollo-link-http": "^1.5.17",
    "apollo-server-cache-redis": "^1.2.3",
    "cross-fetch": "^3.0.6",
    "email-validator": "^2.0.4",
    "graphql": "^15.5.0",
    "js-convert-case": "^4.1.1",
    "lodash": "^4.17.21",
    "moment": "^2.29.1",
    "nest-winston": "^1.4.0",
    "objects-to-csv": "^1.3.6",
    "redis": "^3.0.2",
    "reflect-metadata": "^0.1.13",
    "request": "^2.88.2",
    "rimraf": "^3.0.2",
    "rxjs": "^6.6.6",
    "swagger-js-codegen": "^1.13.0",
    "swagger-typescript-codegen": "^3.2.3",
    "ts-md5": "^1.2.7",
    "winston": "^3.3.3"
  },
  "devDependencies": {
    "@nestjs/cli": "^8.1.1",
    "@nestjs/schematics": "^7.2.7",
    "@nestjs/testing": "^7.6.13",
    "@types/amqp-connection-manager": "^2.0.11",
    "@types/express": "^4.17.11",
    "@types/jest": "^26.0.20",
    "@types/moment": "^2.13.0",
    "@types/node": "^14.14.31",
    "@types/redis": "^2.8.28",
    "@types/request": "^2.48.7",
    "@types/supertest": "^2.0.10",
    "@typescript-eslint/eslint-plugin": "^4.15.2",
    "@typescript-eslint/parser": "^4.15.2",
    "eslint": "^7.20.0",
    "eslint-config-prettier": "^8.1.0",
    "eslint-plugin-prettier": "^3.3.1",
    "husky": "^7.0.1",
    "jest": "^26.6.3",
    "prettier": "^2.2.1",
    "supertest": "^6.1.3",
    "ts-jest": "^26.5.2",
    "ts-loader": "^8.0.17",
    "ts-node": "^9.1.1",
    "tsconfig-paths": "^3.9.0",
    "typescript": "^4.1.5"
  },

Node: v14.4.0

The messages get directly requeued for some reason without getting into my subscription handler.

EDIT: @collinwu

This seems to be a problem with the rabbitmq container itself. After pinning the rabbitmq version to rabbitmq:3.8.17-management-alpine it works fine.

collinwu commented 3 years ago

@stvnwrgs - thanks for sharing!

i swapped out the 3.8.5 image for 3.8.17 but i continue to have the same issue where i create test messages and they end up stuck in the queue without the subscription handler processing them. i also tried to manually acknowledge, but then the message doesn't flow through so nothing to acknowledge

im leveraging nestjs v8, but i'm not sure the issue is there. i also spent some time trying to downgrade to nest v7 as referenced in your example, but i'm not having success with the dependency resolution requirements not matching up so npm / yarn install fails - i imagine it's mostly due to dependencies involving @nestjs/graphql on that front

i'm not really sure how to debug this - anyone with any ideas on how can i inspect the RabbitSubscribe decorator?

the author created this package to address some production use-cases and this is preferably the library we might want to use, but the not working out of the box is a concern.

following nestjs's rabbitmq documentation works just fine, but we want to publish to exchanges and map queues to exchanges

collinwu commented 3 years ago

i figured out where and what went wrong in my scenario.

i suppose a combination of factors played into this, but for anyone that might be facing a similar issue, check your nestjs pipes, guards, interceptors.

debugging led me to inspect: https://github.com/golevelup/nestjs/blob/master/packages/rabbitmq/src/amqp/connection.ts#L430

I was able to log a silent error telling me that the request was unauthorized. That was really the clue I needed. This led me to refer back to RabbitMQ usage with interceptors as I have a global auth guard enabled, but didn't immediately make the connection this might have been the culprit.

Changing my global auth guard to return true for isRabbitContext(context) fixed the issue with the subscription handler not firing on messages. Now this library works as intended.

But having this issue led me to try to fill in some gaps in my knowledge about Nestjs in general. The FAQ section is where I started to understand how the Request Lifecycle works in Nest and other important concepts such as Hybrid Application which is what we're shooting for in our use case.

Hope this helps another person who might have this issue in the future

WonderPanda commented 3 years ago

@collinwu Thanks for following up on this and writing up a detailed breakdown of how you solved the issue. In the future I will see about trying to add some kind of warning or logs that indicate that requests are being rejected due to interceptors

alisherafat01 commented 2 years ago

I had the same issue but changing HELLO WORLD to "HELLO WORLD" was the solution!!! try wrapping message in quotation while sending messages to rabbit

Deepgrg commented 1 year ago

I had the same issue of RabbitSubscribe not firing my service method, in my case the method didn't fire because the default behaviour of RabbitSubscribe & RabbitRPC is that it does not allow NonJSON payloads. i.e. my producer had to publish by:

Buffer.from(JSON.stringify({data: "Hello World!"}))

Hope this piece of information help the person who has similar issue

actuallyzefe commented 5 months ago

@RabbitSubscribe({ exchange: '<exchange>', routingKey: '<your_routing_key>', queue: '<your_queue_name>', })

If you go to this method with CMD + LEFT you will be able to see the .d.ts file. There is a option called allowNonJsonMessages: boolean

If you set this option to true in @RabbitSubsribe decorator, you will be able to consume messages which are not JSON.

Just sayin