Closed HeJiaWang-96 closed 3 years ago
As we have explained previously, a single message cannot be delivered to more than one consumer at the same time.
A single message published to an exchange can, however, be routed to more than one queue. Each queue then gets a complete copy. These copies are separate messages as far as RabbitMQ is concerned. They can be consumed in parallel on any number of channels.
RabbitMQ will never produce a duplicate of a message routed to a queue. A publisher can produce duplicates, however, e.g. by naively keeping track of what it has published earlier.
This is true for both regular (long-lived) and polling consumers, any acknowledgement mode and so on. While not directly related, we highly recommend against polling consumers which your example code uses. They are extremely inefficient and their metrics are much harder to reason about in practice.
Whereas with regular consumers you can easily observe redeliveries (there's a separate metric for that), which is a very important metric in this context, it's not really the case with continuous polling using basic.get
. We have previously recommended taking a traffic capture which will be the ultimate source of information as to what this RabbitMQ receives and delivers, but before you do any of that, consider changing your consumer app code to NOT USE POLLING. There are NO legitimate reasons to use polling in pretty much any newly developed system. It will be significantly easier to reason about what is going on in your system after this change.
With this approach, any message redelivery — which can happen due to automatic requeueing in case of consumer channel closure or any other reason — will immediately log a duplicate in red. Other than the publisher or multiple publisher instances producing duplicates (something that a traffic capture can prove right or wrong), my best guess is that something in your test triggers such a requeueing event. Again, it will be easy to see with a regular consumer and a traffic capture: the delivery will have its redevelired
property set to true
. With polling consumers it is significantly more difficult to keep track of (although a traffic capture would still allow you to inspect every protocol frame sent).
Redeliveries is a sad fact of life in distributed systems. Designing your consumers in a way that can take them into account is therefore a very good idea. This is not as trivial as it sounds when there may be multiple consumers (in one OS process, or multiple process instances, or any combination of the two).
So there are three immediate recommendations:
RabbitMQ does not deliver a single message in a single queue to more than one consumer, with any queue type, delivery acknowledgement mode and so on. It will redeliver a single message more than once in case it had to be requeued for any reason.
I can reproduce the red output with this consumer implementation almost immediately:
[13:12:46.371][queue_info - consume3]:62b87532-000c-4a1f-82df-9bf48f4780c5
[13:12:46.375][queue_info - consume1]:51f0b32c-001f-462b-b24c-455e427f12c4
[13:12:46.383][queue_info - consume3]:3c935efc-1d1d-4c63-84e1-340eb545b741
[13:12:46.383][queue_info - consume1]:3c935efc-1d1d-4c63-84e1-340eb545b741
[13:12:46.383][queue_info - consume3]:8446945c-e617-4f12-b833-ae15822879d2
[13:12:46.384][queue_info - consume3]:42830e1c-0a07-4f02-8bd0-b0e8703ebb0d
[13:12:46.384][queue_info - consume3]:eefc3ee8-04dc-4dc2-a1f8-8fdfe55c34bf
[13:12:46.385][queue_info - consume3]:9aa7d1ab-07d2-4f50-a9d1-d1fac3c2dac7
[13:12:46.385][queue_info - consume3]:806c6ba9-80e0-40a7-b9a4-aedd5f9a3c83
[13:12:46.385][queue_info - consume3]:7b884fa3-69eb-49f2-8e49-92d26ec1ce07
[13:12:46.391][queue_info - consume:1]:Duplicate message:3c935efc-1d1d-4c63-84e1-340eb545b741,DeliveryTag:2,err:An item with the same key has already been added. Key: 3c935efc-1d1d-4c63-84e1-340eb545b741
however, according to a traffic capture using Wireshark (and filtered for amqp
), there is only one basic.get-ok
(a basic.get
polling operation response) with the message:
0000 02 00 00 00 45 00 00 9b 00 00 40 00 40 06 00 00 ....E.....@.@...
0010 7f 00 00 01 7f 00 00 01 16 28 c6 74 61 4b 1b 3c .........(.taK.<
0020 15 49 45 a6 80 18 18 de fe 8f 00 00 01 01 08 0a .IE.............
0030 9e b5 db c7 9e b5 db c7 01 00 03 00 00 00 1d 00 ................
0040 3c 00 47 00 00 00 00 00 00 00 02 00 00 0a 71 75 <.G...........qu
0050 65 75 65 5f 69 6e 66 6f 00 00 03 e4 ce 02 00 03 eue_info........
0060 00 00 00 0e 00 3c 00 00 00 00 00 00 00 00 00 24 .....<.........$
0070 00 00 ce 03 00 03 00 00 00 24 33 63 39 33 35 65 .........$3c935e
0080 66 63 2d 31 64 31 64 2d 34 63 36 33 2d 38 34 65 fc-1d1d-4c63-84e
0090 31 2d 33 34 30 65 62 35 34 35 62 37 34 31 ce 1-340eb545b741.
To make this easier to filter in Wireshark, I changed the publisher to use a fanout exchange with a unique routing key:
namespace ProduceNews
{
class Program
{
static void Main(string[] args)
{
//RabbitMQ Version 3.8.9
//RabbitMQ.Client Version 6.2.1
ConnectionFactory connectionFactory = new ConnectionFactory()
{
UserName = "guest",
Password = "guest",
HostName = "127.0.0.1",
Port = 5672
};
Dictionary<string, object> keyValues = new Dictionary<string, object>();
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel model = connection.CreateModel())
{
model.ExchangeDeclare(exchange: "polling.fanout", type: "fanout", durable: true, autoDelete: false);
//Post messages to a fixed queue
model.QueueDeclare("queue_info", false, false, false, null);
model.QueueBind(queue: "queue_info", exchange: "polling.fanout", routingKey: "irrelevant");
while (true)
{
for (int i = 0; i < 1000; i++)
{
string uid = Guid.NewGuid().ToString();
try
{
keyValues.Add(uid, null);
}
catch (Exception)
{
Console.WriteLine($"Duplicate message:{uid}");
throw;
}
Console.WriteLine($"Send:{uid}");
model.BasicPublish("polling.fanout", uid, false, null, Encoding.UTF8.GetBytes(uid));
}
Console.WriteLine($"complete!");
Console.ReadKey();
}
}
}
}
}
}
The output of the consumer on the 2nd run:
[13:33:15.847][queue_info - consume1]:ffecfe5a-d9ee-4b8a-88ab-e0f4e428c407
[13:33:15.847][queue_info - consume1]:52991a39-b237-45e7-89d7-552d2f7eb21f
[13:33:15.848][queue_info - consume1]:6b0c352a-9344-4799-8aa4-391985759336
[13:33:15.848][queue_info - consume1]:32474734-6d85-4d23-a5b8-bd124134fc23
[13:33:15.848][queue_info - consume1]:5e1b335b-f609-4cc7-a03b-d5deea3e8512
[13:33:15.854][queue_info - consume:4]:Duplicate message:a36ec343-61c9-400e-94da-73b4d3d67138,DeliveryTag:4,err:An item with the same key has already been added. Key: a36ec343-61c9-400e-94da-73b4d3d67138
There is still only one basic.publish
frame with this routing key and basic.get-ok
response with this routing key, as demonstrated by Wireshark's Find a Packet form. Here's the polling consumer response:
0000 02 00 00 00 45 00 00 c3 00 00 40 00 40 06 00 00 ....E.....@.@...
0010 7f 00 00 01 7f 00 00 01 16 28 c8 d8 76 81 5f 55 .........(..v._U
0020 83 d2 3f da 80 18 18 22 fe b7 00 00 01 01 08 0a ..?...."........
0030 9e c8 48 65 9e c8 48 65 01 00 01 00 00 00 45 00 ..He..He......E.
0040 3c 00 47 00 00 00 00 00 00 01 c3 00 0e 70 6f 6c <.G..........pol
0050 6c 69 6e 67 2e 66 61 6e 6f 75 74 24 61 33 36 65 ling.fanout$a36e
0060 63 33 34 33 2d 36 31 63 39 2d 34 30 30 65 2d 39 c343-61c9-400e-9
0070 34 64 61 2d 37 33 62 34 64 33 64 36 37 31 33 38 4da-73b4d3d67138
0080 00 00 02 21 ce 02 00 01 00 00 00 0e 00 3c 00 00 ...!.........<..
0090 00 00 00 00 00 00 00 24 00 00 ce 03 00 01 00 00 .......$........
00a0 00 24 61 33 36 65 63 33 34 33 2d 36 31 63 39 2d .$a36ec343-61c9-
00b0 34 30 30 65 2d 39 34 64 61 2d 37 33 62 34 64 33 400e-94da-73b4d3
00c0 64 36 37 31 33 38 ce d67138.
The value appears twice because it is both used in the payload and in the routing key.
So something is up with this consumer implementation. RabbitMQ delivers messages as expected.
Thank you for your help. I need to use the polling consumption pattern I described. The above problems lie in RabbitMQ.Client It doesn't appear in version 5.2.0.
RabbitMQ.Client In version 6.2.1, I changed it again, using two threads to consume the same queue, and the automatic confirmation mode was enabled. The situation I described also appeared.
for (int i = 1; i <= 2; i++) { Task task = Task.Run(() => { using (IConnection connection = connectionFactory.CreateConnection()) { using (IModel model = connection.CreateModel()) { while (true) { BasicGetResult getResult = model.BasicGet("queue_info", true); if (getResult == null) { Thread.Sleep(1000); break; } string uid = Encoding.UTF8.GetString(getResult.Body.ToArray()); Console.WriteLine(uid); try { keyValues.Add(uid, null); } catch (Exception ex) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine($"Duplicate message:{uid},DeliveryTag:{getResult.DeliveryTag},err:{ex.Message}"); } } } } }); }
@HeJiaWang-96 I don't have much else to suggest. I have clear evidence from my tests that there is only one copy of each message that is returned to the polling consumer. Please take it from here: use Wireshark to observe the traffic to confirm my findings and then troubleshoot your consumer app.
I strongly recommend against using polling. It is a truly terrible approach and we are considering removing polling support from RabbitMQ clients and RabbitMQ itself at some point. It is never the right thing to do.
RabbitMQ .NET client 6.x has a fair number of public and internal changes, including changes around consumer operation dispatch. They may be highly relevant for this workload. There were more changes in master. In any case, unfortunately we do not optimize for such concurrent polling consumer workloads. All efforts go into optimizing regular consumers, and all benchmarks and data used to drive the changes is also specific to regular consumers.
Thank you very much for your help. There is no problem with the regular consumption pattern. I tried to remove polling patterns from my business.
I tested again, but the situation described by me will still appear. I wrote a test script. If multiple channels get duplicate messages from the same queue, the font will turn red, and the log will be recorded. You can execute the test script. The test script is attached. RabbitMQ: 3.8.9 RabbitMQ.Client: 6.2.1 TestRabbitmq.zip