noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Question: What happens upon duplication detection? #67

Closed VagyokC4 closed 3 years ago

VagyokC4 commented 3 years ago

Does it return an error, or does it return as if it was accepted?

noxdafox commented 3 years ago

By default, RabbitMQ does not notify the sender of whether the message was routed/published or not.

You can enable publisher confirmation to know whether the message was published within the target queue or not.

VagyokC4 commented 3 years ago

By default, RabbitMQ does not notify the sender of whether the message was routed/published or not.

You can enable publisher confirmation to know whether the message was published within the target queue or not.

How would I tell it to only accept the 1st request, all other requests report back accepted?

I am trying to solve a race condition where I have multiple services reporting the same thing. I want the first one to be accepted and run as normal, the others I want to say they were accepted (as if they were the first request) so that the pipeline runs as normal. In this case we have triple redundancy, yet I only want to process the first one to report.

BTW: I'm using Mass-Transit to interface with RabbitMQ.

noxdafox commented 3 years ago

As long as you don't enable publisher confirmation, RabbitMQ will not provide any information regarding what happened to the message. So what you are looking for, is the default behaviour already.

I am not familiar with Mass-Transit.

VagyokC4 commented 3 years ago

As long as you don't enable publisher confirmation, RabbitMQ will not provide any information regarding what happened to the message. So what you are looking for, is the default behaviour already.

I am not familiar with Mass-Transit.

I think that is my issue. Mass-Transit I think is turning on publisher confirmation as it wants to ensure the message was published correctly. When I add the deduplication headers to the message, I start receiving failures on the bus.

However my observations are a little strange, as it looks as if the deduplication is allowing some duplication if the requests come in at virtually the same time. As I said earlier, I have three services (for redundancy) publishing messages as things happen in real-time. Sometimes I see two messages show up, and the third throw an error -- it all depends on when the service publishes the message. But it does look like if two services publish at virtually the same time, the queue will get multiple messages with the same deduplication header. (Maybe this logic needs to be tightened up)

That being said, it would be nice to be able to put an attribute on the queue specifically telling the deduplicator to return success even if we detect duplication (making this configuration independent of the way the queue is setup for publisher confirmation) -- similar to the way Azure Service Bus deduplication works. image

Thoughts?

noxdafox commented 3 years ago

However my observations are a little strange, as it looks as if the deduplication is allowing some duplication if the requests come in at virtually the same time. As I said earlier, I have three services (for redundancy) publishing messages as things happen in real-time. Sometimes I see two messages show up, and the third throw an error -- it all depends on when the service publishes the message. But it does look like if two services publish at virtually the same time, the queue will get multiple messages with the same deduplication header. (Maybe this logic needs to be tightened up)

How is your set-up? Are you using exchange level de-duplication or queue level one? How many RabbitMQ nodes?

Have you actually verified if you end up with duplications on the consumers side?

That being said, it would be nice to be able to put an attribute on the queue specifically telling the deduplicator to return success even if we detect duplication (making this configuration independent of the way the queue is setup for publisher confirmation) -- similar to the way Azure Service Bus deduplication works.

This would break the internal contracts within the broker. This is not an issue of the plugin but rather an issue of MassTransit which enforces specific features of RabbitMQ.

I would rather suggest to see if there's a way to disable the publisher confirmation configuration: https://github.com/MassTransit/MassTransit/blob/15733c236d8f5b79977d8a787a3d2b23ff495b8a/src/Transports/MassTransit.RabbitMqTransport/RabbitMqHostSettings.cs#L119

VagyokC4 commented 3 years ago

How is your set-up? Are you using exchange level de-duplication or queue level one? How many RabbitMQ nodes?

So in production we have a three node cluster of rabbitmq running. I was using both exchange and queue deduplication. Mass-Transit (almost) always sends via exchanges, and they route to the queues.

Have you actually verified if you end up with duplications on the consumers side?

Yes, during testing I can watch the messages leave the service and arrive at the consumer, and multiple requests are being processed by the consumer. This is where I noticed exceptions being thrown, and multiple requests hitting the consumer.

I would rather suggest to see if there's a way to disable the publisher confirmation configuration:

I'm leery of this option, as I depend on the publish confirmation. If for some reason we fail to publish correctly, I need to be able to replay the request, so I only mark the request as complete once I publish without error. Turning this off would introduce possibility of the pipeline breaking down, as we would mark the request as complete even though it was not published (successfully).

Ideally the plug-in would (silently) deduplicate (nothing more, nothing less) by emulating the behavior of the Azure Service Bus deduplication and leaving errors for when there is an actual error. I.e. The message comes in with deduplication header. If there exists a TTL, then drop the message and return success. Otherwise attempt as normal, and if for some reason the message was not published, throw an error.

So currently we are just testing with this plugin. Currently I am using Redis for my deduplicator and it's working just fine. I was wanting to use this plug-in and do it at the rabbitmq layer (if possible). However we have not seen positive results (yet), with our workflow.

noxdafox commented 3 years ago

I'll try to clarify a bit things in here. As I said, I'm not familiar with MassTransit and I don't know how it interacts with RabbitMQ.

One thing to keep in mind, is that the Plugin does not "silently" de-duplicate messages. At the exchange level, it communicate the broker that no route should be followed for a duplicate message. At the queue level, it notifies the broker the message is a duplicate. It's the broker which decides what to reply to the client.

This said, RabbitMQ has 2 features which I suspect are at play in the following scenario.

Publishers confirm

From the documentation.

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

What it means in practice is that if a message appears to be non-routable but it has been handled correctly, an acknowledgement will be sent back to the client. The "routability" of a message is handled at the exchange level. Which means that exchange-level de-duplication works with publisher confirms. Here's an example (in python) proving it.

import pika
from hashlib import md5

RABBITMQ_URL = 'amqp://guest:guest@localhost:5672/'

parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Enable publisher confirm
channel.confirm_delivery()

# Declare exchange, queue and bind them together
channel.exchange_declare(
    exchange='test_exchange_deduplication',
    exchange_type='x-message-deduplication',
    arguments={'x-cache-size': '5'})
channel.queue_declare(queue='test_queue', durable=True)
channel.queue_bind('test_queue', 'test_exchange_deduplication')

# The message will be deduplicated based on its content
# The MD5 digest of the content is used as deduplication header
message_body = 'hello world'
message_deduplication_header = md5(message_body.encode()).hexdigest()

# Only one message shall be routed to the queue
for _ in range(10):
    try:
        channel.basic_publish(
            exchange='test_exchange_deduplication',
            routing_key='',
            body=message_body,
            properties=pika.BasicProperties(
                headers={'x-deduplication-header': message_deduplication_header}))
        print('Message publish was confirmed')
    except pika.exceptions.UnroutableError:
        print('Message could not be confirmed')

channel.close()
connection.close()
$ python publish_confirm.py 
Message publish was confirmed
Message publish was confirmed
Message publish was confirmed
...

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

When a message appears to be instead routable but it is not published in one or more of the target queues, a negative acknowledgement is sent to the client. This means that queue level de-duplication will negatively acknowledge all duplicated messages. It is not an issue of the plugin, it's a semantical issue of the broker.

import pika
from hashlib import md5

RABBITMQ_URL = 'amqp://guest:guest@localhost:5672/'

parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Enable publisher confirm
channel.confirm_delivery()

# Declare exchange, queue and bind them together
channel.exchange_declare(exchange='test_exchange',
                         exchange_type='fanout')
channel.queue_declare(queue='test_queue_deduplication',
                      durable=True,
                      arguments={'x-message-deduplication': True})
channel.queue_bind('test_queue_deduplication', 'test_exchange')

# The message will be deduplicated based on its content
# The MD5 digest of the content is used as deduplication header
message_body = 'hello world'
message_deduplication_header = md5(message_body.encode()).hexdigest()

# Only one message shall be routed to the queue
for _ in range(10):
    try:
        channel.basic_publish(
            exchange='test_exchange',
            routing_key='',
            body=message_body,
            properties=pika.BasicProperties(
                headers={'x-deduplication-header': message_deduplication_header}))
        print('Message publish was confirmed')
    except pika.exceptions.NackError:
        print('Message could not be confirmed')

channel.close()
connection.close()
$ python publish_confirm.py 
Message publish was confirmed    # First message goes through
Message could not be confirmed   # All other messages are duplicates
Message could not be confirmed
...

Mandatory messages

Mandatory messages require the broker to confirm if a message was routed to the bound queues or not. From the documentation.

In some circumstances it can be important for producers to ensure that their messages are being routed to queues (although not always - in the case of a pub-sub system producers will just publish and if no consumers are interested it is correct for messages to be dropped).

In other words, requesting a message to be mandatory over a de-duplication exchange and expect it to positively acknowledged is a contradiction in terms. This is what might happen to you when you tested exchange level de-duplication. You are both requesting for publishers confirmation AND setting the messages as mandatory. In such case the issue is a service misconfiguration.

import pika
from hashlib import md5

RABBITMQ_URL = 'amqp://guest:guest@localhost:5672/'

parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Enable publisher confirm
channel.confirm_delivery()

# Declare exchange, queue and bind them together
channel.exchange_declare(
    exchange='test_exchange_deduplication',
    exchange_type='x-message-deduplication',
    arguments={'x-cache-size': '5'})
channel.queue_declare(queue='test_queue', durable=True)
channel.queue_bind('test_queue', 'test_exchange_deduplication')

# The message will be deduplicated based on its content
# The MD5 digest of the content is used as deduplication header
message_body = 'hello world'
message_deduplication_header = md5(message_body.encode()).hexdigest()

# Only one message shall be routed to the queue
for _ in range(10):
    try:
        channel.basic_publish(
            exchange='test_exchange_deduplication',
            routing_key='',
            body=message_body,
            mandatory=True,                                           # Setting the message as mandatory
            properties=pika.BasicProperties(
                headers={'x-deduplication-header': message_deduplication_header}))
        print('Message publish was confirmed')
    except pika.exceptions.UnroutableError:
        print('Message could not be confirmed')

channel.close()
connection.close()
$ python publish_confirm.py 
Message publish was confirmed    # First message goes through
Message could not be confirmed   # All other messages are duplicates
Message could not be confirmed
...

tl;dr: In other words, Publisher confirms works with exchange level de-duplication but not queue level one. It's a semantical issue and not an issue of the plugin. The plugin itself, has nothing to do with this.

Enabling both Publisher confirms AND mandatory messages over a de-duplication exchange will result in duplicated messages to be nacked as per request.

noxdafox commented 3 years ago

Closing this issue. Please re-open it if further discussion is required.