golevelup / nestjs

A collection of badass modules and utilities to help you level up your NestJS applications 🚀
MIT License
2.3k stars 267 forks source link

RabbitMQ Message Distribution Across Multiple NestJS Microservices #863

Closed mwolf1989 closed 3 weeks ago

mwolf1989 commented 3 weeks ago

We're implementing a microservices architecture using NestJS and RabbitMQ for inter-service communication. The current goal is to distribute messages from one service to multiple other services using RabbitMQ.

In My API-GW Module it works:

image
   imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [
        {
          name: 'events',
          type: 'fanout',
        },
      ],
      uri: 'amqp://guest:guest@rabbitmq:5672', // Update with your RabbitMQ URI
      connectionInitOptions: { wait: true },
    }),
  ],
  controllers: [IntegrationEventsController],
  providers: [IntegrationEventsService],
  exports: [],
})

I can emit an event:

  @Post('test')
  testEvent() {
    this.logger.log('Emitting test event');
    //generate random number between 1 and 3
    const randomNumber = Math.floor(Math.random() * 3) + 1;
    this.amqpConnection.publish('events', '', {
      message: 'test',
      randomNumber,
    });
    return this.integrationEventsService.testEvent(randomNumber);
  }

And also Subscribe:

  @RabbitSubscribe({
    exchange: 'events',
    routingKey: '',
    queue: 'api-gateway-events',
  })
  public async handleMessage(message: any) {
    this.logger.log('Received message: ' + JSON.stringify(message));
    // Add your business logic here to handle the received message
  }

I if i add a secound service with:

image

and do the same with:

@Module({
  imports: [
    DatabaseModule,
    DatabaseModule.forFeature([Genisys, TenantEventPref]),
    LoggerModule,
    ConfigModule.forRoot({
      isGlobal: true,
      validationSchema: Joi.object({
        RABBITMQ_URI: Joi.string().required(),
        POSTGRES_HOST: Joi.string().required(),
        POSTGRES_PORT: Joi.number().required(),
        POSTGRES_USER: Joi.string().required(),
        POSTGRES_PASSWORD: Joi.string().required(),
        POSTGRES_DB: Joi.string().required(),
        DATABASE_SYNCHRONIZE: Joi.boolean().required(),
      }),
    }),
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [
        {
          name: 'events',
          type: 'fanout',
        },
      ],
      uri: 'amqp://guest:guest@rabbitmq:5672', // Update with your RabbitMQ URI
      connectionInitOptions: { wait: true },
    }),
  ],
  controllers: [GenesysEventsController],
  providers: [GenesysEventsService],
})
export class GenesysEventsModule {}

and as Subscriber:

import { AmqpConnection, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable, Logger } from '@nestjs/common';

@Injectable()
export class GenesysEventsService {
  private readonly logger = new Logger(GenesysEventsService.name);
  constructor(private readonly amqpConnection: AmqpConnection) {}
  getHello(): string {
    return 'Hello World!';
  }
  @RabbitSubscribe({
    exchange: 'events',
    routingKey: '',
    queue: 'genesys-events',
  })
  public async handleMessage(message: any) {
    this.logger.log(
      'Received message in Genesys service: ' + JSON.stringify(message),
    );
    // Add your message handling logic here
  }
}

I dont see any log.

I noticed in the log for the apigw:

api-gateway-1         | [Nest] 137  - 10/21/2024, 4:00:14 PM     LOG [RabbitMQModule] IntegrationEventsService.handleMessage {subscribe} -> events::::api-gateway-events

But not for the other service... what am i doing wrong there?

NestJS is v10 and "@golevelup/nestjs-rabbitmq": "^5.5.0",

Thanks in advance

Marcel

mwolf1989 commented 3 weeks ago

Also i can only see the api-gw binding in rmq:

image
mwolf1989 commented 3 weeks ago

ok it starts working when doing the boostrap with this:

async function bootstrap() {
  const app = await NestFactory.create(GenesysEventsModule);
  app.useLogger(app.get(Logger));
  await app.startAllMicroservices();
  app.listen(3000).then(() => {
    console.log('Genesys Events Service is running on port 3000');
  });
}
bootstrap();

but not with:

async function bootstrap() {
  const app = await NestFactory.create(GenesysEventsModule);
  app.useLogger(app.get(Logger));
  await app.startAllMicroservices();
}
bootstrap();

Is there a way to avoid this?

underfisk commented 3 weeks ago

@mwolf1989 You'll have to execute listen so that our module is invoked, you can bind to a random port and then prevent any public access to it but usually you should expose a /healthz endpoint anyway so HTTP would be needed.

I'm not sure why you're using startAllMicroservices, which's not applicable for golevelup module, that's only specific to the official nestjs

We have an example over here