amqp / rhea

A reactive messaging library based on the AMQP protocol
Apache License 2.0
273 stars 80 forks source link

Receiving Duplicate Message Notifications #414

Closed kuldeep-kushwaha closed 3 months ago

kuldeep-kushwaha commented 3 months ago

I am utilizing the rhea npm library to construct a receiver service integrated with Solace as the broker, using the AMQP 1.0 protocol. The purpose of the receiver is to capture all messages posted in a specific queue.

The code operates effectively and successfully retrieves all messages sent to the queue. However, in the event that the receiver encounters disconnection while listening for messages and reconnects afterward, it occasionally retrieves duplicate events. Please refer to the attachment for more details. AMQP

On the left side, there is a sender setup responsible for dispatching messages to a queue. On the right side, there is a receiver configured to monitor the messages within the queue. Upon sending a message from the sender, the receiver duly receives it. However, after reconnection, the receiver occasionally re-receives the same message.

Despite the receiver acknowledging each message using context.delivery.accept(context), duplicate notifications persist. I seek advice on how to mitigate the occurrence of duplicate message notifications.

grs commented 3 months ago

Acknowledgements are asynchronous so when your first receiver exits the most recent acknowledgements may not have been processed by the server. You can verify that in your test using tracing on the broker side if they offer it. (If the broker does see the acknowledgement and still redelivers it that would be a broker issue. I think it unlikely in this case though). The guarantee for acknowledged messages is at-least-once.

To ensure exactly-once processing you need to maintain some state on the processing side regarding what has been processed. If a disconnected receiver is reattached and you receive some messages with a non-zero delivery_count you can check against that state whether they have been processed before. In the case that you have a single sender and therefore a continually incrementing sequence, that is easy as all you need to do is record the highest sequence processed. Some brokers may be able to add some sequence on to the messages, you could ask regarding solace what they would recommend.

AMQP also allows you to separate the accepting of a message from its settlement. This gives a three way acknowledgement that can be used to help maintain more complex state on the processing side. You can set the rcv_settle_mode when opening the receiver to 1, and then handle the settled event which will be called when the broker has received the accept and at which point you can free up any state held associated with that delivery. We don't have an example of this at present unfortunately. You would also need to ensure that your broker supported that rcv_settle_mode.

Depending on your application, it may also be possible to do some application specific check for the small number of in-doubt messages that are retransmitted when the receiver is re-attached, e.g. if you were inserting in a database you could check where the delivery_count is non-zero that the row does not already exist.