noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Queue level deduplication seems broken #96

Open aygalinc opened 1 year ago

aygalinc commented 1 year ago

Hello ,

On a rabbit 3.9.26, we use a straightforward setting to test message depuplication with pica :

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost',
    5672,
    '/',
    credentials,
    channel_max=3,
    client_properties={
        'connection_name': 'producer_dedup'
    },
    ))
channel = connection.channel()
args = {
    'x-message-deduplication': True
}
channel.queue_declare(queue='refresh_all', arguments=args)

args = {}
# channel.queue_declare(queue='hello6', arguments=args)
# channel.queue_bind(exchange='refresh_all', queue='refresh_all')
for i in range(10000):
    channel.basic_publish(exchange='',
                        routing_key='refresh_all',
                        body='Hello World!',
                        properties=pika.BasicProperties(headers={
                            "x-deduplication-header": "hello"
                        }))
    print(f" [x] Sent 'Hello World {i} !'")
    sleep(3)
connection.close()

And we have a basic message consumer. We observe that the consumer publish only one message. We expect that once the consumer has finish an individual consumption, a new message will be enqueue.

noxdafox commented 1 year ago

Hello,

this is not providing the full picture. How is the consumer reading the messages? Is the consumer, for example, acknowledging the message once consumed?

Please provide a working example which reproduces the issue.

aygalinc commented 1 year ago

The consumer side :

import pika, sys, os

def main():
    credentials = pika.PlainCredentials('guest', 'guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost',
        5672,
        '/',
        credentials,
        client_properties={
            'connection_name': 'sub_dedup',
        },
        ))    
    channel = connection.channel()

    args = {
        'x-message-deduplication': True
    }
    channel.queue_declare(queue='refresh_all', arguments=args)
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='refresh_all', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)
noxdafox commented 1 year ago

There is indeed something odd with the above code.

When I read the message using Elixir/Erlang client, I get the expected behavior. Same applies if I fetch the message via RabbitMQ Management WebUI. But with the Python pika client the message seems to be deduplicated forever even if gone from the queue.

I will investigate this further.

noxdafox commented 1 year ago

I now have a better understanding of the problem.

The issue presents when a consumer is attached to an empty queue with auto_ack enabled. When such scenario presents, the internal queue discard callback is called instead of the expected publish_delivered[1].

Issue with this, is we do not receive the message as parameter of the discard callback but just its Id. This is not enough to retrieve the x-deduplication-header and remove it from the cache.

Hence, the observed behaviour: the first message goes through, all subsequent ones are deemed duplicates as the original header is not removed from the cache.

This issue does not present, if auto_ack is disabled.

[1] https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbit/src/rabbit_amqqueue_process.erl#L684

michaelklishin commented 1 year ago

The above internal API change contributed by @noxdafox will ship starting with RabbitMQ 3.12.0.

noxdafox commented 1 year ago

Thanks @michaelklishin, I will make a release of the plugin once RMQ 3.12.0 will be out.

michaelklishin commented 1 year ago

https://github.com/rabbitmq/rabbitmq-server/pull/7802 was reverted because it is not rolling upgrade-safe without a feature flag.

michaelklishin commented 1 year ago

Using a feature flag on the hot path has risks. Perhaps the PR should be adjusted so that all modules support both message IDs and message records, and backported to 3.11.x.

Then, at some point, we can pass the entire message record along. For now, the plugin would have to require manual acknowledgements (which is almost always a good idea).