amqp / rhea-promise

A promisified layer over rhea AMQP client
Apache License 2.0
30 stars 19 forks source link

Strange 40ms delay when receiving a message in AWS MQ #12

Open jfromaniello opened 5 years ago

jfromaniello commented 5 years ago

Describe the bug

Strange behavior in AWS MQ (ActiveMQ). Messages get delayed by 40ms.

To be honest I am not sure at this point if it is an issue with the library or with AWS MQ.

To Reproduce Steps to reproduce the behavior:

worker.js

#!/usr/bin/env node
const { Connection } = require('rhea-promise');

(async function() {
  const connection = new Connection({
    host: process.env.AMQP_HOST,
    port: process.env.AMQP_PORT,
    transport: process.env.AMQP_TRANSPORT,
    username: process.env.AMQP_USERNAME,
    password: process.env.AMQP_PASSWORD
  });

  await connection.open();

  const receiver = await connection.createReceiver({
    source: {
      address: 'request-queue'
    }
  });

  const sender = await connection.createSender({
    target: {}
  });

  receiver.on("message", (context) => {
    const { reply_to, correlation_id } = context.message;
    const body = JSON.parse(context.message.body);
    console.log(`sent ${Date.now() - body.sentDate}ms ago`);
    setTimeout(() => {
      sender.send({
        body: '',
        correlation_id,
        to: reply_to,
        durable: false,
      });
    }, 100);
  });
})();

client.js

const { Connection } = require('rhea-promise');

(async function() {
  const connection = new Connection({
    host: process.env.AMQP_HOST,
    port: process.env.AMQP_PORT,
    transport: process.env.AMQP_TRANSPORT,
    username: process.env.AMQP_USERNAME,
    password: process.env.AMQP_PASSWORD
  });

  await connection.open();

  const receiver = await connection.createReceiver({
    source: {
      dynamic: true
    }
  });

  const sender = await connection.createSender({
    target: {
      address: 'request-queue'
    }
  });

  receiver.on("message", (context) => {
    const { correlation_id } = context.message;
    console.log(`got the response for ${correlation_id}`);
  });

  let correlation_id = 0;
  const send = () => {
    return sender.send({
      durable: false,
      correlation_id: correlation_id++,
      reply_to: receiver.address,
      body: JSON.stringify({ sentDate: Date.now() })
    }
  )};

  await Promise.all([
    send(),
    send(),
    send(),
  ]);

  await Promise.all([
    send(),
    send(),
    send(),
  ]);

  await Promise.all([
    send(),
    send(),
    send(),
  ]);

})();

Expected behavior

I'll expect the worker to report 1ms to 3ms maximum. But instead I get this:

sent 2ms ago
sent 3ms ago
sent 39ms ago
sent 40ms ago
sent 40ms ago
sent 40ms ago
sent 39ms ago
sent 39ms ago
sent 40ms ago

Package-version: 0.1.6 node.js version: v8.11.4 OS name and version: Ubuntu 14.04

Additional context

These are my observations so far:

Sometimes it works well but running it again it shows the 40ms delay.

I can't reproduce this by installing ActiveMQ locally, but the configuration and version of the broker matches.

The test was run in the same region and availability zone, latency is not a factor.

Handling credits manually, for example starting with 2 credits and adding a credit inside the setTimeout produces the same pattern.

If I use two connections (in two containers) in the worker side, instead of using the same connection for receiver and sender, it works as expected. I am not sure why this happen, the messages are small, there is no latency, I think it should work with only 1 connection.

If I remove the sender part from the worker, I don't get the delay either.

amarzavery commented 5 years ago

The sender needs to have the required credits, for it to send some messages. Sender.sendable() returns a boolean value, indicating whether it can send messages at that moment. May be logging that information could provide more insights into the reason behind the delay.

@grs - What are your thoughts on this issue?

jfromaniello commented 5 years ago

Yes, I printed sender.sendable() and it is always true, even when I get the delay. It looks more like a problem with the connection of the worker code, the strange bit is that if I use two connections instead of 1 it works as expected.