amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
277 stars 79 forks source link

Unable to send more than 10 messages to different queue at same time. #363

Open sunil-rnl opened 3 years ago

sunil-rnl commented 3 years ago

I'm using below code to send messages to queue. Function is accepting an array of messages and queue name to which message needs to be pushed. The issue is I'm getting this error when I try to call this method multiple times to send messages to different queue.

I'm calling this method more than 10 times to send different messages to different queues.

I'm very new to message queue, probably I'm missing some important fundamental here.

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 accepted listeners added to [Container]. Use emitter.setMaxListeners() to increase limit
static sendToQueue(messages, queue) {
    logger.info('QueueProducer -> sendToQueue', { wrap: 'start' });
    let confirmed = 0;
    let sent = 0;
    this.connection = container.create_container().connect(options);
    this.connection.open_sender(queue);
    this.connection.container.on('sendable', (context) => {
      while (context.sender.sendable() && sent < messages.length) {
        logger.info(`sent ${queue}: ${sent} ===>> ${JSON.stringify(messages[sent])}`);
        context.sender.send({ message_id: sent, body: messages[sent] });
        sent += 1;
      }
    });
    container.on('accepted', (context) => {
      if (++confirmed === messages.length) {
        logger.info('all messages confirmed');
        context.connection.close();
      }
    });
    container.on('disconnected', (context) => {
      if (context.error) console.error('%s %j', context.error, context.error);
      sent = confirmed;
    });
    logger.info('QueueProducer -> sendToQueue', { wrap: 'end' });
  }
grs commented 3 years ago

Every time you are calling sendToQueue you are adding a listener to container (for both 'accepted' and 'disconnected' events). You probably meant to add those to this.connection.container, as you are creating a separate container for each call. That would at least avoid the error you are seeing.

However my advise would be not to create a separate container every time you call that function, unless you have a specific need to do so.

sunil-rnl commented 3 years ago

@grs without creating separate container I'm not able to send messages to different queues. When using single container and opening sender using this.connection.open_sender(queue); is not working

grs commented 3 years ago

What error do you get there?

sunil-rnl commented 3 years ago

@grs I've made these changes to use the single connection, but all messages are sent to only one queue. Here I'm not sure how to use single connection for multiple queues. I thought this.connection.open_sender(queue); will do the work.

 await QueueProducer.sendToQueue(data1, 'queue_1');
 await QueueProducer.sendToQueue(data2, 'queue_2');
 await QueueProducer.sendToQueue(data3, 'queue_3');

When I call sentToQueue method for different queue then instead of messages sent to respective queues, all messages are sent to only one queue. In this case all messages were sent to queue_1

class QueueProducer {
  constructor() {
    this.connection = container.connect(options);
  }

  async sendToQueue(messages, queue) {
    logger.info('QueueProducer -> sendToQueue', { wrap: 'start' });
    let confirmed = 0;
    let sent = 0;
    this.connection.open_sender(queue);
    this.connection.container.on('sendable', (context) => {
      while (context.sender.sendable() && sent < messages.length) {
        logger.info(`sent ${queue}: ${sent} ===>> ${JSON.stringify(messages[sent])}`);
        context.sender.send({ message_id: sent, body: messages[sent] });
        sent += 1;
      }
    });
    this.connection.container.on('accepted', (context) => {
      if (++confirmed === messages.length) {
        logger.info('all messages confirmed');
        context.connection.close();
      }
    });
    this.connection.container.on('disconnected', (context) => {
      if (context.error) console.error('%s %j', context.error, context.error);
      sent = confirmed;
    });
    logger.info('QueueProducer -> sendToQueue', { wrap: 'end' });
  }
}
grs commented 3 years ago

You are still setting listeners on the container (which is now shared between all connections). Instead of this.connection.container.on just do this.connection.on' (I assume you are using a different QueueProducer object for each send as the connection is a member of that object and you close it after each send).