noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Can't make it work in C# project #33

Closed fish68 closed 5 years ago

fish68 commented 5 years ago

I'm a RabbitMQ newbie; for a new project I'd like to use the deduplication plugin as it would perfectly fit my needs. I'm using AspNet Core 3.0 worker process and language is C#.

I've tried a very simple example, 2 publishers sending 10 messages numbered 1 to 10 and one consumer getting messages and acknowledging them.

I'm having quite strange and unpredictable results:

if I run the 3 workers (2 Publishers and one consumer) inside the same process, it looks like that deduplication plugin works fine and inserts in the queue only 10 unique messages, but the consumer reads only the first 2 and ackowledges only one of them.

if I run publishers and consumer in two different processes, the consumer gets all the 10 messages but after ack the messages remain in the queue and if I run again the consumer process they get reprocessed again.

my publisher code:

        int cnt = 1;
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            Dictionary<string, object> dd = new Dictionary<string, object>();
            dd["x-message-deduplication"] = true;
            channel.QueueDeclare(queue: qname,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: dd);

            while (!stoppingToken.IsCancellationRequested)
            {
                var message = GetMessage(cnt);
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                Dictionary<string, object> d = new Dictionary<string, object>();
                d["x-deduplication-header"] = cnt;
                properties.Headers = d;

                channel.BasicPublish(exchange: "",
                                     routingKey: qname,
                                     basicProperties: properties,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);

                logDB(cnt, "Sender"+Wname);
                cnt++;
                if (cnt > 10)
                    break;
                await Task.Delay(1000, stoppingToken);
            }

my consumer code:

while (!stoppingToken.IsCancellationRequested)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                Dictionary<string, object> dd = new Dictionary<string, object>();
                dd["x-message-deduplication"] = true;
                channel.QueueDeclare(queue: qname,
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: dd);

                _logger.LogInformation("{0} Waiting for messages.", Cname);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    _logger.LogInformation("{0} Received {1}", Cname, message);

                    string[] parts = message.Split('-');
                    int cntmsg = int.Parse(parts[1]);

                    logDB(cntmsg, Cname);

                    Thread.Sleep((cntmsg % 5) * 1000);

                    _logger.LogInformation("{0} Received {1} done", Cname, message);

                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
                };
                channel.BasicConsume(queue: qname,
                                     autoAck: false,
                                     consumer: consumer);

                _logger.LogInformation("{0} After BasicConsume", Cname);

                while (true)
                    await Task.Delay(1000, stoppingToken);

            }

I have also tried to use deduplication at exchange level; I create the exchange this way:

               Dictionary<string, object> dd = new Dictionary<string, object>();
                dd["x-cache-size"] = 1000;
                dd["x-cache-ttl"] = 100000;
                dd["x-cache-persistence"] = "memory";
                channel.ExchangeDeclare("testex", "x-message-deduplication", false, true, dd);

                channel.QueueDeclare(queue: qname,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.QueueBind(qname, "dedup", "tasks", null);

but on ExchangeDeclare I get this exception:

RabbitMQ.Client.Exceptions.OperationInterruptedException: 'The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - Missing or invalid argument, 'x-cache-size' must be an integer greater than 0", classId=40, methodId=10, cause='

could you please give me some hints on what I'm mistaking? Roberto

noxdafox commented 5 years ago

Greetings,

I am not familiar with C# and it's RMQ client library.

If you try the above code without deduplication enabled how does it behave? Any difference?

What version of RabbitMQ are you using?

The exchange error is due to the fact you did not specify the x-cache-size parameter when declaring the exchange. It is mandatory as documented in the README.

fish68 commented 5 years ago

Hi, thank you for your answer, I'm using RabbitMQ version 3.7.9.

if I use without deduplication it works, obviously messages are sent twice to consumer, but all of them are ackowledged.

about the exchange... I have actually specified that parameter

Dictionary<string, object> dd = new Dictionary<string, object>();
               **dd["x-cache-size"] = 1000;**
               dd["x-cache-ttl"] = 100000;
               dd["x-cache-persistence"] = "memory";
               channel.ExchangeDeclare("testex", "x-message-deduplication", false, true, dd);

is there any way to activate some debug logs on deduplication plugin that could give me some hints about what happens? thank you

noxdafox commented 5 years ago

As I am not familiar with C# I need you to help me out a bit. Could you please bundle the above code in something I can easily execute from my workstation? I am using Linux.

Ideally something which I can run with a single (or few) entry points meanwhile checking on the broker what happens.

fish68 commented 5 years ago

Oh, thank you very much... I'm using Net Core so it should be easy to create linux compatible console applications. I will try to create them and upload here.

could you please tell me which linux distro are you using so I can target the right platform on Visual Studio?

noxdafox commented 5 years ago

Debian.

fish68 commented 5 years ago

Hi, I have created two executables for linux, syntax is

./Publisher RabbitMQhost user password [nodedup | exchange]

calling with only host, user and password, it would create a queue named TestQ with deduplication plugin enabled... each time a a key is pressed 2 messages are posted to the queue (2 msgs with id=1, 2 with id=2, and so on)

calling with nodedup option, it would create create a queue named TestQ2 without deduplication every time a key is pressed the same couple of messages are sent

calling with exchange option, it would create create an exchange named TestX bound to queue TestQ2 every time a key is pressed the same couple of messages are sent... I receive error during exchange creation because of x-cache-size missing, but I'm passing that parameter to the creation.

./Consumer RabbitMQhost user password [nodedup]

parameters are as above:

nodedup will connect to TestQ2

let me recap what happens:

if I don't use deduplication all works, all message pairs published are immediately received and acknowledged by Consumer.

if I use deduplication on Queue, the messages are posted once to the queue but consumer won't receive them.. if I stop consumer it would re-read all messages in queue but they are not acknowledged and stay in queue

if I use exchange I get an error on creation on Publisher.

source.zip contains the cs source files Thank you

Consumer.zip Publisher.zip

source.zip

noxdafox commented 5 years ago

Thank you very much!

I could already reproduce the issue with the queue showing this issue and #34 to be related (or duplicated). I can now investigate and fix it. I will then check the exchange issue.

fish68 commented 5 years ago

WoW! Thank you very much for providing all this support!

noxdafox commented 5 years ago

The issue is quite simple.

My assumption was that message headers were always serialized as string whereas you are using integers. This is visible in the crash log reported in issue #34 for example where @Civilis317 is using integers as well.

I will write some regression tests and make a fix in the upcoming days. Thanks for reporting and helping reproducing this issue.

fish68 commented 5 years ago

Oh... now I feel a bit stupid for not having thought to try a string... I will try it while you create the fix.

Thank you again for the very fast support

noxdafox commented 5 years ago

Integers are valid headers. Therefore you should be able to use them. This is clearly a bug.

fish68 commented 5 years ago

Hi! I have tried with a string datatype and it works, but I noticed that Consumer occasionally receives some messages twice.

dedup

noxdafox commented 5 years ago

Release 0.4.1 now handles non-string message headers including integers and null values.

The exchange constraint check has been as well relaxed. It was failing due to the C# library declaring the value as signedint instead of long. Now all possible integer values are accepted.

The reason why you sometimes see duplicate messages in your test is to be expected. Queue level deduplication will drop a message if another one with the same deduplication header is already in the queue. Once a message is acknowledged, a new copy can make it through. As in your tests you are rapidly acknowledging the message once retrieved from the queue, it might happen that you consume the first copy before the second one is published. The messages are not duplicated in that case as the queue is empty when the second one is published in.

Try adding a small delay (0.2 seconds) between the moment you retrieve the message and the moment you acknowledge it and you won't see the above behaviour anymore.

Queue level deduplication is more tricky to understand, for most cases I recommend to use the exchange level one.

fish68 commented 5 years ago

Thank you, I will download and try the new version... I will also try exchange deduplication as per your advice.