nestjsx / nestjs-amqp

Amqp connections for nestjs :love_letter:
MIT License
190 stars 24 forks source link

Hangs on channel creation #35

Open jackabe opened 5 years ago

jackabe commented 5 years ago

Module setup connects fine with no errors:

import { AmqpModule } from "nestjs-amqp";
import RabbitService from "./rabbit.service";
import { ConfigModule, ConfigService } from "nestjs-config";
import { Module } from "@nestjs/common";
@Module({
    imports: [
      ConfigModule,
      AmqpModule.forRootAsync({
        useFactory: async (configService: ConfigService) => {
          return configService.get("rabbit");
        },
        inject: [ConfigService]
      }),
    ],
    providers: [
      RabbitService,
    ],
    exports: [RabbitService]
  })
export class RabbitModule {}

Service does not console log anything inside of 'createChannel'.

import {Injectable, Logger} from "@nestjs/common";
import {InjectAmqpConnection} from "nestjs-amqp";

@Injectable()
export default class RabbitService {
    constructor(@InjectAmqpConnection() private readonly amqp) {}

    private readonly logger = new Logger(RabbitService.name);

    async publish(message: string)  {
      await this.amqp.createChannel((err, channel) => {
          if (err != null) {
          this.logger.log(err, "Error connecting");
          }
          console.log("Adding");
          channel.assertQueue("location_validate");
          channel.sendToQueue("location_validate", "Hello test");
        });
    }
}

Logged out the connection:

Connection {
  _events: [Object: null prototype] {
    frameError: [Function: bound ],
    error: [Function: bound ],
    close: [Function: bound ],
    blocked: [Function: bound ],
    unblocked: [Function: bound ]
  },
  _eventsCount: 5,
  _maxListeners: undefined,
  stream: Socket {
    connecting: false,
    _hadError: false,
    _parent: null,
    _host: null,
    _readableState: ReadableState {
      objectMode: false,
      highWaterMark: 16384,
      buffer: BufferList { head: null, tail: null, length: 0 },
      length: 0,
      pipes: null,
      pipesCount: 0,
      flowing: false,
      ended: false,
      endEmitted: false,
      reading: true,
      sync: false,
      needReadable: true,
      emittedReadable: false,
      readableListening: true,
      resumeScheduled: false,
      paused: true,
      emitClose: false,
      autoDestroy: false,
      destroyed: false,
      defaultEncoding: 'utf8',
      awaitDrain: 0,
      readingMore: false,
      decoder: null,
      encoding: null
    },
    readable: true,
    _events: [Object: null prototype] {
      end: [Array],
      error: [Array],
      drain: [Function],
      readable: [Function: go]
    },
    _eventsCount: 4,
    _maxListeners: undefined,
    _writableState: WritableState {
      objectMode: false,
      highWaterMark: 16384,
      finalCalled: false,
      needDrain: false,
      ending: false,
      ended: false,
      finished: false,
      destroyed: false,
      decodeStrings: false,
      defaultEncoding: 'utf8',
      length: 0,
      writing: false,
      corked: 0,
      sync: false,
      bufferProcessing: false,
      onwrite: [Function: bound onwrite],
      writecb: null,
      writelen: 0,
      bufferedRequest: null,
      lastBufferedRequest: null,
      pendingcb: 0,
      prefinished: false,
      errorEmitted: false,
      emitClose: false,
      autoDestroy: false,
      bufferedRequestCount: 0,
      corkedRequestsFree: [Object]
    },
    writable: true,
    allowHalfOpen: false,
    _sockname: null,
    _pendingData: null,
    _pendingEncoding: '',
    server: null,
    _server: null,
    [Symbol(asyncId)]: 12,
    [Symbol(kHandle)]: TCP {
      reading: true,
      onconnection: null,
      [Symbol(owner)]: [Circular]
    },
    [Symbol(lastWriteQueueSize)]: 0,
    [Symbol(timeout)]: null,
    [Symbol(kBytesRead)]: 0,
    [Symbol(kBytesWritten)]: 0
  },
  muxer: Mux {
    newStreams: [],
    oldStreams: [],
    blocked: false,
    scheduledRead: false,
    out: Socket {
      connecting: false,
      _hadError: false,
      _parent: null,
      _host: null,
      _readableState: [ReadableState],
      readable: true,
      _events: [Object: null prototype],
      _eventsCount: 4,
      _maxListeners: undefined,
      _writableState: [WritableState],
      writable: true,
      allowHalfOpen: false,
      _sockname: null,
      _pendingData: null,
      _pendingEncoding: '',
      server: null,
      _server: null,
      [Symbol(asyncId)]: 12,
      [Symbol(kHandle)]: [TCP],
      [Symbol(lastWriteQueueSize)]: 0,
      [Symbol(timeout)]: null,
      [Symbol(kBytesRead)]: 0,
      [Symbol(kBytesWritten)]: 0
    }
  },
  rest: <Buffer >,
  frameMax: 4096,
  sentSinceLastCheck: true,
  recvSinceLastCheck: true,
  expectSocketClose: false,
  freeChannels: BitSet { words: [], wordsInUse: 0 },
  channels: [ { channel: [Object], buffer: [Socket] } ],
  serverProperties: {
    capabilities: {
      publisher_confirms: true,
      exchange_exchange_bindings: true,
      'basic.nack': true,
      consumer_cancel_notify: true,
      'connection.blocked': true,
      consumer_priorities: true,
      authentication_failure_close: true,
      per_consumer_qos: true,
      direct_reply_to: true
    },
    cluster_name: 'rabbit@ip-178-31-27-6.us-west-2.compute.internal',
    copyright: 'Copyright (C) 2007-2019 Pivotal Software, Inc.',
    information: 'Licensed under the MPL.  See https://www.rabbitmq.com/',
    platform: 'Erlang/OTP 22.0.2',
    product: 'RabbitMQ',
    version: '3.7.15'
  },
  channelMax: 2047,
  heartbeat: 60,
  heartbeater: Heart {
    _events: [Object: null prototype] { timeout: [Function], beat: [Function] },
    _eventsCount: 2,
    _maxListeners: undefined,
    interval: 60,
    sendTimer: Timeout {
      _idleTimeout: 30000,
      _idlePrev: [TimersList],
      _idleNext: [TimersList],
      _idleStart: 3073,
      _onTimeout: [Function: bound ],
      _timerArgs: undefined,
      _repeat: 30000,
      _destroyed: false,
      [Symbol(refed)]: true,
      [Symbol(asyncId)]: 880,
      [Symbol(triggerId)]: 877
    },
    recvTimer: Timeout {
      _idleTimeout: 60000,
      _idlePrev: [TimersList],
      _idleNext: [TimersList],
      _idleStart: 3073,
      _onTimeout: [Function: bound ],
      _timerArgs: undefined,
      _repeat: 60000,
      _destroyed: false,
      [Symbol(refed)]: true,
      [Symbol(asyncId)]: 881,
      [Symbol(triggerId)]: 877
    }
  },
  accept: [Function: mainAccept]
}

Logged out the channel:

Channel {
  _events: [Object: null prototype] {
    ack: [Function: bound ],
    nack: [Function: bound ],
    delivery: [Function: bound ],
    cancel: [Function: bound ]
  },
  _eventsCount: 4,
  _maxListeners: undefined,
  connection: Connection {
    _events: [Object: null prototype] {
      frameError: [Function: bound ],
      error: [Function: bound ],
      close: [Function: bound ],
      blocked: [Function: bound ],
      unblocked: [Function: bound ]
    },
    _eventsCount: 5,
    _maxListeners: undefined,
    stream: Socket {
      connecting: false,
      _hadError: false,
      _parent: null,
      _host: null,
      _readableState: [ReadableState],
      readable: true,
      _events: [Object: null prototype],
      _eventsCount: 4,
      _maxListeners: undefined,
      _writableState: [WritableState],
      writable: true,
      allowHalfOpen: false,
      _sockname: null,
      _pendingData: null,
      _pendingEncoding: '',
      server: null,
      _server: null,
      [Symbol(asyncId)]: 12,
      [Symbol(kHandle)]: [TCP],
      [Symbol(lastWriteQueueSize)]: 0,
      [Symbol(timeout)]: null,
      [Symbol(kBytesRead)]: 0,
      [Symbol(kBytesWritten)]: 0
    },
    muxer: Mux {
      newStreams: [],
      oldStreams: [],
      blocked: false,
      scheduledRead: false,
      out: [Socket]
    },
    rest: <Buffer >,
    frameMax: 4096,
    sentSinceLastCheck: true,
    recvSinceLastCheck: true,
    expectSocketClose: false,
    freeChannels: BitSet { words: [Array], wordsInUse: 1 },
    channels: [ [Object], [Object] ],
    serverProperties: {
      capabilities: [Object],
      cluster_name: 'rabbit@ip-178-31-27-6.us-west-2.compute.internal'
      copyright: 'Copyright (C) 2007-2019 Pivotal Software, Inc.',
      information: 'Licensed under the MPL.  See https://www.rabbitmq.com/',
      platform: 'Erlang/OTP 22.0.2',
      product: 'RabbitMQ',
      version: '3.7.15'
    },
    channelMax: 2047,
    heartbeat: 60,
    heartbeater: Heart {
      _events: [Object: null prototype],
      _eventsCount: 2,
      _maxListeners: undefined,
      interval: 60,
      sendTimer: Timeout {
        _idleTimeout: 30000,
        _idlePrev: [TimersList],
        _idleNext: [TimersList],
        _idleStart: 2916,
        _onTimeout: [Function: bound ],
        _timerArgs: undefined,
        _repeat: 30000,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(asyncId)]: 813,
        [Symbol(triggerId)]: 810
      },
      recvTimer: Timeout {
        _idleTimeout: 60000,
        _idlePrev: [TimersList],
        _idleNext: [TimersList],
        _idleStart: 2916,
        _onTimeout: [Function: bound ],
        _timerArgs: undefined,
        _repeat: 60000,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(asyncId)]: 814,
        [Symbol(triggerId)]: 810
      }
    },
    accept: [Function: mainAccept]
  },
  reply: null,
  pending: [],
  lwm: 1,
  unconfirmed: [],
  handleMessage: [Function: acceptDeliveryOrReturn],
  consumers: {},
  ch: 1
}
dennis-b commented 5 years ago

hi @jackabe

this works for me :

async publish({ message }) {
        console.log(message)
        try {
             const channel = await this.amqp.createChannel()
             await channel.assertQueue(QUEUE_DEV);
            channel.sendToQueue(QUEUE_DEV, Buffer.from(message));
        } catch (e) {
            console.log(e)
        }
    }