amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
279 stars 79 forks source link

Receiver receives first message from AWS MQ and duplicate first message for subsequent messages sent to the queue. #212

Open rmakantDangi123 opened 5 years ago

rmakantDangi123 commented 5 years ago

Hi Team, @grs I am using AWS MQ queue to send and receive messages. The code work as expected for a small number of messages, however, if messages pushed in large numbers concurrently in the queue. The received messages are duplicates of the first message when received from the queue. This issue happened when messages push concurrently to the queue. The code for sending and receiving the messages is below:

  1. Code for creating receiver connection and open connection, this code executes once. const conn_attentionSpan = client.connect({ container_id:recieverconn_as, channel_max: 10, transport: 'tls', host: CONFIG.activemq_host, hostname: CONFIG.activemq_host, username: CONFIG.activemq_broker_username, password: CONFIG.activemq_broker_password, port: CONFIG.activemq_amqp_port, reconnect: true, reconnect_limit: 100
    });

conn_attentionSpan.open_receiver({ source: { address: CONFIG.attention_span_queuename, durable:2, expiry_policy:'never' } });

  1. Code for creating sender connection, open sender, send and receive messages: The below code called for every request comes to the rest service exposed: module.exports.pushDataToQueue = function(contentBatch,queueName) { let vidyardEventBody = contentBatch; return new Promise((resolve, reject) => { var count=0; const connection = client.connect({ container_id:senderconnName+count, channel_max: 100, transport: 'tls', host: CONFIG.activemq_host, hostname: CONFIG.activemq_host, username: CONFIG.activemq_broker_username, password: CONFIG.activemq_broker_password, port: CONFIG.activemq_amqp_port, reconnect: true, reconnect_limit: 100
    }); console.debug('sender connection id : '+connection.options.container_id); var sender = connection.open_sender(${queueName}); sender.once('sendable', function (context) { context.sender.send({message_id:count, body:vidyardEventBody}) count++; connection.close(); }); conn_attentionSpan.once('message', (context) => { if (context.message.body === 'detach') { // detaching leaves the subscription active, so messages sent // while detached are kept until we attach again context.receiver.detach(); context.connection.close(); } else if (context.message.body === 'close') { // closing cancels the subscription context.receiver.close(); context.connection.close(); } else { console.debug("message received : ", context.message.body); }
    });
    }); }; Can you please help me regarding this issue. I am a beginner to "rhea", and for node too. Your help will be highly appreciated.
grs commented 5 years ago

I don't really understand what you are trying to do. Setting the message event handler does not trigger receiving, it just lets you handle the received message.Your count variable looks to be local to the sender. Is this some sort ofrequest-response? i.e. the messages receive is in some way triggered by the message sent and you want to wait for it before considering the handling of the data to be completed?

rmakantDangi123 commented 5 years ago

Thank you so much @grs for your quick response, This is api which sends some json data to the aws mq queue and sends a response back once data received from the queue. If I got it correctly, once data put into the queue by the sender, the receiver should get the data from the queue as this is always listening to the queue. And Every message should be received as it was pushed by the sender. What you think about my receiver code, do i need to improve it to pull events from the queue? As per my understanding, message receive should not be trigger by the message sent. Once data pushed to the queue by the sender, it should trigger automatically as it is a subscriber. if needed please correct me. what you suggest on above?

rmakantDangi123 commented 5 years ago

Hi @grs , I have followed the given subscriber example for the receiver code. can you please share any receiver example for retrieving messages from awsmq queues?

Your help is much appreciated.

grs commented 5 years ago

I think you need to correlate the incoming messages with the requests made. You can look at the client.js/server.js for an example of that.

rmakantDangi123 commented 5 years ago

Hi @grs, My code works fine if messages sent sequentially. I mean the receiver de-queue messages perfectly without duplicating. So my question is, why this is duplicating for concurrent messages, not for sequential?

grs commented 5 years ago

I don't believe it is duplicating messages. I think you are not correlating responses with requests. When running sequentially this won't matter. With concurrent requests it will.