golevelup / nestjs

A collection of badass modules and utilities to help you level up your NestJS applications 🚀
MIT License
2.16k stars 244 forks source link

rabbitmq: handlers didn't works in app-non-pub #729

Closed alexeyinn closed 3 months ago

alexeyinn commented 3 months ago

I have two nestjs App. Both get config in app.module.ts:

   RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [
        {
          name: 'data-service',
          type: 'direct',
          createExchangeIfNotExists: true,
        },
      ],
      uri: `amqp://${RABBIT_USER}:${RABBIT_PASSWORD}@${RABBIT_HOST}:${RABBIT_PORT}`,
    }),

First app has publisher:

 this.amqpConnection.publish('data-service', 'routing-key', {
      event: event,
    });

And both app has subscribers:

@RabbitSubscribe({
    exchange: 'data-service',
    routingKey: 'routing-key',
  })
  public async pubSubHandler2(msg: object) {
    console.log('🚀 ~ AppService ~ pubSubHandler2 ~ msg:', msg);
  }

But, message get only first app (from where, message sendings). If look up in RabbitMQ manager GUI, can see two channels (two apps). State of rist app is "idle", status of second app (who can't get a message) is "active" and not-reacived message in "anacked"

image

UPD:

If i try publish message and get him in that problem second app, i still don't get message in second app (just copy module,services, etc), but still get in first one! It's seems problem in that app. But why i can't receive message?

I try error handling in that app:

  @RabbitSubscribe({
    exchange: 'data-service',
    routingKey: 'routing-key',
    errorHandler: (error) => {
      console.log(error);
    },
  })

And what i got, when message to to exchange:

ConfirmChannel {
  _events: [Object: null prototype] {
    ack: [Function: bound handleConfirm],
    nack: [Function: bound handleConfirm],
    close: [ [Function (anonymous)], [Function (anonymous)] ],
    delivery: [Function: bound handleDelivery],
    cancel: [Function: bound handleCancel],
    drain: [Function (anonymous)]
  },
  _eventsCount: 6,
  _maxListeners: undefined,
  connection: Connection {
    _events: [Object: null prototype] {
      frameError: [Function: bound onSocketError],
      error: [Function: bound emit],
      close: [Function: bound emit],
      blocked: [Function: bound emit],
      unblocked: [Function: bound emit]
    },
    _eventsCount: 5,
    _maxListeners: undefined,
    stream: Socket {
      connecting: false,
      _hadError: false,
      _parent: null,
      _host: 'localhost',
      _closeAfterHandlingError: false,
      _readableState: [ReadableState],
      _events: [Object: null prototype],
      _eventsCount: 4,
      _maxListeners: undefined,
      _writableState: [WritableState],
      allowHalfOpen: false,
      _sockname: null,
      _pendingData: null,
      _pendingEncoding: '',
      server: null,
      _server: null,
      [Symbol(async_id_symbol)]: 30,
      [Symbol(kHandle)]: [TCP],
      [Symbol(lastWriteQueueSize)]: 0,
      [Symbol(timeout)]: null,
      [Symbol(kBuffer)]: null,
      [Symbol(kBufferCb)]: null,
      [Symbol(kBufferGen)]: null,
      [Symbol(kCapture)]: false,
      [Symbol(kSetNoDelay)]: false,
      [Symbol(kSetKeepAlive)]: false,
      [Symbol(kSetKeepAliveInitialDelay)]: 0,
      [Symbol(kBytesRead)]: 0,
      [Symbol(kBytesWritten)]: 0
    },
    muxer: Mux {
      newStreams: [],
      oldStreams: [],
      blocked: false,
      scheduledRead: false,
      out: [Socket]
    },
    rest: <Buffer >,
    frameMax: 4096,
    sentSinceLastCheck: true,
    recvSinceLastCheck: true,
    expectSocketClose: false,
    freeChannels: BitSet { words: [Array], wordsInUse: 1 },
    channels: [ [Object], [Object] ],
    serverProperties: {
      capabilities: [Object],
      cluster_name: 'rabbit@e25ff444f79e',
      copyright: 'Copyright (c) 2007-2023 Broadcom Inc and/or its subsidiaries',
      information: 'Licensed under the MPL 2.0. Website: https://rabbitmq.com',
      platform: 'Erlang/OTP 25.3.2.9',
      product: 'RabbitMQ',
      version: '3.12.12'
    },
    channelMax: 2047,
    heartbeat: 5,
    heartbeater: Heart {
      _events: [Object: null prototype],
      _eventsCount: 2,
      _maxListeners: undefined,
      interval: 5,
      sendTimer: Timeout {
        _idleTimeout: 2500,
        _idlePrev: [TimersList],
        _idleNext: [TimersList],
        _idleStart: 9163,
        _onTimeout: [Function: bound runHeartbeat],
        _timerArgs: undefined,
        _repeat: 2500,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(kHasPrimitive)]: false,
        [Symbol(asyncId)]: 48,
        [Symbol(triggerId)]: 45
      },
      recvTimer: Timeout {
        _idleTimeout: 5000,
        _idlePrev: [Timeout],
        _idleNext: [TimersList],
        _idleStart: 9116,
        _onTimeout: [Function: bound runHeartbeat],
        _timerArgs: undefined,
        _repeat: 5000,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(kHasPrimitive)]: false,
        [Symbol(asyncId)]: 49,
        [Symbol(triggerId)]: 45
      },
      [Symbol(kCapture)]: false
    },
    accept: [Function: mainAccept],
    [Symbol(kCapture)]: false
  },
  reply: null,
  pending: [],
  lwm: 2,
  unconfirmed: [],
  handleMessage: [Function: acceptDeliveryOrReturn],
  consumers: Map(3) {
    'amq.ctag-vfAQ1tgIBaEKVRFbrBc9Pg' => [Function (anonymous)],
    'amq.ctag-RuLdHphS5a1fB63fm2ZmYQ' => [Function (anonymous)],
    'amq.ctag-jJnH0ecXJtjG6Y-eP8MDNw' => [Function (anonymous)]
  },
  ch: 1,
  [Symbol(kCapture)]: false
}
check
ConfirmChannel {
  _events: [Object: null prototype] {
    ack: [Function: bound handleConfirm],
    nack: [Function: bound handleConfirm],
    close: [ [Function (anonymous)], [Function (anonymous)] ],
    delivery: [Function: bound handleDelivery],
    cancel: [Function: bound handleCancel],
    drain: [Function (anonymous)]
  },
  _eventsCount: 6,
  _maxListeners: undefined,
  connection: Connection {
    _events: [Object: null prototype] {
      frameError: [Function: bound onSocketError],
      error: [Function: bound emit],
      close: [Function: bound emit],
      blocked: [Function: bound emit],
      unblocked: [Function: bound emit]
    },
    _eventsCount: 5,
    _maxListeners: undefined,
    stream: Socket {
      connecting: false,
      _hadError: false,
      _parent: null,
      _host: 'localhost',
      _closeAfterHandlingError: false,
      _readableState: [ReadableState],
      _events: [Object: null prototype],
      _eventsCount: 4,
      _maxListeners: undefined,
      _writableState: [WritableState],
      allowHalfOpen: false,
      _sockname: null,
      _pendingData: null,
      _pendingEncoding: '',
      server: null,
      _server: null,
      [Symbol(async_id_symbol)]: 30,
      [Symbol(kHandle)]: [TCP],
      [Symbol(lastWriteQueueSize)]: 0,
      [Symbol(timeout)]: null,
      [Symbol(kBuffer)]: null,
      [Symbol(kBufferCb)]: null,
      [Symbol(kBufferGen)]: null,
      [Symbol(kCapture)]: false,
      [Symbol(kSetNoDelay)]: false,
      [Symbol(kSetKeepAlive)]: false,
      [Symbol(kSetKeepAliveInitialDelay)]: 0,
      [Symbol(kBytesRead)]: 0,
      [Symbol(kBytesWritten)]: 0
    },
    muxer: Mux {
      newStreams: [],
      oldStreams: [],
      blocked: false,
      scheduledRead: false,
      out: [Socket]
    },
    rest: <Buffer >,
    frameMax: 4096,
    sentSinceLastCheck: true,
    recvSinceLastCheck: true,
    expectSocketClose: false,
    freeChannels: BitSet { words: [Array], wordsInUse: 1 },
    channels: [ [Object], [Object] ],
    serverProperties: {
      capabilities: [Object],
      cluster_name: 'rabbit@e25ff444f79e',
      copyright: 'Copyright (c) 2007-2023 Broadcom Inc and/or its subsidiaries',
      information: 'Licensed under the MPL 2.0. Website: https://rabbitmq.com',
      platform: 'Erlang/OTP 25.3.2.9',
      product: 'RabbitMQ',
      version: '3.12.12'
    },
    channelMax: 2047,
    heartbeat: 5,
    heartbeater: Heart {
      _events: [Object: null prototype],
      _eventsCount: 2,
      _maxListeners: undefined,
      interval: 5,
      sendTimer: Timeout {
        _idleTimeout: 2500,
        _idlePrev: [TimersList],
        _idleNext: [TimersList],
        _idleStart: 76663,
        _onTimeout: [Function: bound runHeartbeat],
        _timerArgs: undefined,
        _repeat: 2500,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(kHasPrimitive)]: false,
        [Symbol(asyncId)]: 48,
        [Symbol(triggerId)]: 45
      },
      recvTimer: Timeout {
        _idleTimeout: 5000,
        _idlePrev: [Timeout],
        _idleNext: [TimersList],
        _idleStart: 74116,
        _onTimeout: [Function: bound runHeartbeat],
        _timerArgs: undefined,
        _repeat: 5000,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(kHasPrimitive)]: false,
        [Symbol(asyncId)]: 49,
        [Symbol(triggerId)]: 45
      },
      [Symbol(kCapture)]: false
    },
    accept: [Function: mainAccept],
    [Symbol(kCapture)]: false
  },
  reply: null,
  pending: [],
  lwm: 2,
  unconfirmed: [ [Function (anonymous)] ],
  handleMessage: [Function: acceptDeliveryOrReturn],
  consumers: Map(3) {
    'amq.ctag-vfAQ1tgIBaEKVRFbrBc9Pg' => [Function (anonymous)],
    'amq.ctag-RuLdHphS5a1fB63fm2ZmYQ' => [Function (anonymous)],
    'amq.ctag-jJnH0ecXJtjG6Y-eP8MDNw' => [Function (anonymous)]
  },
  ch: 1,
  [Symbol(kCapture)]: false
}
alexeyinn commented 3 months ago

Solution:

Global Auth guard blocked it

Just added some like in guard:

async canActivate(context: ExecutionContext) {
    if ((context.getType() as unknown as string) === 'rmq') {
      return true;
    }
 ...

And all works