Open FantasticSaltedFish opened 11 months ago
Is this an unmodified hello world example? But then the subscriber is really expected to receive only a single sample, unless you're including all discovery data as well.
(Nearly) all the data involved would be reliable, so the typical exchange (in the absence of packet loss) for a low-rate writer sending a single message would be:
wr->rd: DATA + HB
rd->wr: ACK
if the 1st packet is lost (and nothing else is), you end up with:
wr->rd: DATA + HB (lost)
wr->rd: HB
rd->wr: NACK
wr->rd: DATA + HB
rd->wr: ACK
and if it is the second:
wr->rd: DATA + HB
rd->wr: ACK (lost)
wr->rd: HB
rd->wr: ACK
Of course packets, there's not-insignificant likelihood that one of those additional packets gets lost, and it quickly escalates from there. If this happens when the writer is sending a stream of data, then it is also possible that things get combined, so it is not really easy to predict exactly what will happen.
While it clearly is within the expectation that (very nearly) identical packets will be sent at high packet loss rates, it isn't obvious to me why it would end up being exactly 20% and/or common to receive duplicate packets all the time.
Perhaps you can share a Wireshark capture?
I have modified the helloworld programs and the following is the publisher:
#include "dds/dds.h"
#include "HelloWorldData.h"
#include <stdio.h>
#include <stdlib.h>
int main (int argc, char ** argv)
{
dds_entity_t participant;
dds_entity_t topic;
dds_entity_t writer;
dds_return_t rc;
HelloWorldData_Msg msg;
dds_qos_t* qos;
uint32_t status = 0;
(void)argc;
(void)argv;
qos = dds_create_qos();
//dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 5);
//dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL);
//dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_SECS(3));
//dds_qset_durability_service(qos, 0, DDS_HISTORY_KEEP_LAST, 5, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
/* Create a Participant. */
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, qos, NULL);
if (participant < 0)
DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
/* Create a Topic. */
topic = dds_create_topic (
participant, &HelloWorldData_Msg_desc, "HelloWorldData_Msg", qos, NULL);
if (topic < 0)
DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
/* Create a Writer. */
writer = dds_create_writer (participant, topic, qos, NULL);
if (writer < 0)
DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-writer));
dds_delete_qos(qos);
printf("=== [Publisher] Waiting for a reader to be discovered ...\n");
fflush (stdout);
rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_set_status_mask: %s\n", dds_strretcode(-rc));
while(!(status & DDS_PUBLICATION_MATCHED_STATUS))
{
rc = dds_get_status_changes (writer, &status);
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_get_status_changes: %s\n", dds_strretcode(-rc));
/* Polling sleep. */
dds_sleepfor (DDS_MSECS (20));
}
/* Create a message to write. */
for (int i = 0; i < 100; i++)
{
msg.userID = i;
msg.message = "A";
//printf("=== [Publisher] Writing : ");
printf("PublishMessage (%"PRId32", %s)\n", msg.userID, msg.message);
fflush(stdout);
rc = dds_write(writer, &msg);
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_write: %s\n", dds_strretcode(-rc));
dds_sleepfor(DDS_SECS(1));
}
/* Deleting the participant will delete all its children recursively as well. */
rc = dds_delete (participant);
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
return EXIT_SUCCESS;
}
and here is the subscriber:
#include "dds/dds.h"
#include "HelloWorldData.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
/* An array of one message (aka sample in dds terms) will be used. */
#define MAX_SAMPLES 1
int main (int argc, char ** argv)
{
dds_entity_t participant;
dds_entity_t topic;
dds_entity_t reader;
HelloWorldData_Msg *msg;
void *samples[MAX_SAMPLES];
dds_sample_info_t infos[MAX_SAMPLES];
dds_return_t rc;
dds_qos_t *qos;
(void)argc;
(void)argv;
qos = dds_create_qos();
//dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 100);
//dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL);
//
//dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_SECS(3));
//dds_qset_durability_service(qos, 0, DDS_HISTORY_KEEP_LAST, 5, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
/* Create a Participant. */
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, qos, NULL);
if (participant < 0)
DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
/* Create a Topic. */
topic = dds_create_topic (
participant, &HelloWorldData_Msg_desc, "HelloWorldData_Msg", qos, NULL);
if (topic < 0)
DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
/* Create a reliable Reader. */
reader = dds_create_reader (participant, topic, qos, NULL);
if (reader < 0)
DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-reader));
dds_delete_qos(qos);
printf ("\n=== [Subscriber] Waiting for a sample ...\n");
fflush (stdout);
/* Initialize sample buffer, by pointing the void pointer within
* the buffer array to a valid sample memory location. */
samples[0] = HelloWorldData_Msg__alloc ();
/* Poll until data has been read. */
int iNum = 0;
while (true)
{
/* Do the actual read.
* The return value contains the number of read samples. */
//rc = dds_read (reader, samples, infos, MAX_SAMPLES, MAX_SAMPLES);
rc = dds_take(reader, samples, infos, MAX_SAMPLES, MAX_SAMPLES);
if (rc < 0)
DDS_FATAL("dds_read: %s\n", dds_strretcode(-rc));
/* Check if we read some data and it is valid. */
if ((rc > 0) && (infos[0].valid_data))
{
/* Print Message. */
msg = (HelloWorldData_Msg*) samples[0];
iNum++;
//printf ("=== [Subscriber] Received : ");
printf ("SubscribeMessage (%"PRId32", %s, %"PRId32")\n", msg->userID, msg->message, iNum);
fflush (stdout);
//break;
}
else
{
/* Polling sleep. */
dds_sleepfor (DDS_MSECS (1));
}
}
/* Free the data location. */
HelloWorldData_Msg_free (samples[0], DDS_FREE_ALL);
/* Deleting the participant will delete all its children recursively as well. */
rc = dds_delete (participant);
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
return EXIT_SUCCESS;
}
1、tc set network interface:qdisc netem 8001: root refcnt 2 limit 1000 loss 10%;
2、publisher log: === [Publisher] Waiting for a reader to be discovered ... PublishMessage (0, A) PublishMessage (1, A) PublishMessage (2, A) PublishMessage (3, A) PublishMessage (4, A) PublishMessage (5, A) PublishMessage (6, A) PublishMessage (7, A) PublishMessage (8, A) PublishMessage (9, A) PublishMessage (10, A) PublishMessage (11, A) PublishMessage (12, A) PublishMessage (13, A) PublishMessage (14, A) PublishMessage (15, A) PublishMessage (16, A) PublishMessage (17, A) PublishMessage (18, A) PublishMessage (19, A) PublishMessage (20, A) PublishMessage (21, A) PublishMessage (22, A) PublishMessage (23, A) PublishMessage (24, A) PublishMessage (25, A) PublishMessage (26, A) PublishMessage (27, A) PublishMessage (28, A) PublishMessage (29, A) PublishMessage (30, A) PublishMessage (31, A) PublishMessage (32, A) PublishMessage (33, A) PublishMessage (34, A) PublishMessage (35, A) PublishMessage (36, A) PublishMessage (37, A) PublishMessage (38, A) PublishMessage (39, A) PublishMessage (40, A) PublishMessage (41, A) PublishMessage (42, A) PublishMessage (43, A) PublishMessage (44, A) PublishMessage (45, A) PublishMessage (46, A) PublishMessage (47, A) PublishMessage (48, A) PublishMessage (49, A) PublishMessage (50, A) PublishMessage (51, A) PublishMessage (52, A) PublishMessage (53, A) PublishMessage (54, A) PublishMessage (55, A) PublishMessage (56, A) PublishMessage (57, A) PublishMessage (58, A) PublishMessage (59, A) PublishMessage (60, A) PublishMessage (61, A) PublishMessage (62, A) PublishMessage (63, A) PublishMessage (64, A) PublishMessage (65, A) PublishMessage (66, A) PublishMessage (67, A) PublishMessage (68, A) PublishMessage (69, A) PublishMessage (70, A) PublishMessage (71, A) PublishMessage (72, A) PublishMessage (73, A) PublishMessage (74, A) PublishMessage (75, A) PublishMessage (76, A) PublishMessage (77, A) PublishMessage (78, A) PublishMessage (79, A) PublishMessage (80, A) PublishMessage (81, A) PublishMessage (82, A) PublishMessage (83, A) PublishMessage (84, A) PublishMessage (85, A) PublishMessage (86, A) PublishMessage (87, A) PublishMessage (88, A) PublishMessage (89, A) PublishMessage (90, A) PublishMessage (91, A) PublishMessage (92, A) PublishMessage (93, A) PublishMessage (94, A) PublishMessage (95, A) PublishMessage (96, A) PublishMessage (97, A) PublishMessage (98, A) PublishMessage (99, A)
3、subscriber log: === [Subscriber] Waiting for a sample ... SubscribeMessage (0, A, 1) SubscribeMessage (1, A, 2) SubscribeMessage (2, A, 3) SubscribeMessage (3, A, 4) SubscribeMessage (4, A, 5) SubscribeMessage (5, A, 6) SubscribeMessage (6, A, 7) SubscribeMessage (14, A, 8) SubscribeMessage (15, A, 9) SubscribeMessage (16, A, 10) SubscribeMessage (17, A, 11) SubscribeMessage (18, A, 12) SubscribeMessage (21, A, 13) SubscribeMessage (22, A, 14) SubscribeMessage (23, A, 15) SubscribeMessage (26, A, 16) SubscribeMessage (27, A, 17) SubscribeMessage (30, A, 18) SubscribeMessage (31, A, 19) SubscribeMessage (32, A, 20) SubscribeMessage (33, A, 21) SubscribeMessage (34, A, 22) SubscribeMessage (35, A, 23) SubscribeMessage (36, A, 24) SubscribeMessage (37, A, 25) SubscribeMessage (38, A, 26) SubscribeMessage (39, A, 27) SubscribeMessage (40, A, 28) SubscribeMessage (41, A, 29) SubscribeMessage (42, A, 30) SubscribeMessage (43, A, 31) SubscribeMessage (46, A, 32) SubscribeMessage (47, A, 33) SubscribeMessage (48, A, 34) SubscribeMessage (49, A, 35) SubscribeMessage (50, A, 36) SubscribeMessage (51, A, 37) SubscribeMessage (52, A, 38) SubscribeMessage (53, A, 39) SubscribeMessage (54, A, 40) SubscribeMessage (55, A, 41) SubscribeMessage (56, A, 42) SubscribeMessage (57, A, 43) SubscribeMessage (58, A, 44) SubscribeMessage (59, A, 45) SubscribeMessage (60, A, 46) SubscribeMessage (61, A, 47) SubscribeMessage (62, A, 48) SubscribeMessage (63, A, 49) SubscribeMessage (64, A, 50) SubscribeMessage (67, A, 51) SubscribeMessage (70, A, 52) SubscribeMessage (71, A, 53) SubscribeMessage (72, A, 54) SubscribeMessage (73, A, 55) SubscribeMessage (74, A, 56) SubscribeMessage (75, A, 57) SubscribeMessage (76, A, 58) SubscribeMessage (79, A, 59) SubscribeMessage (82, A, 60) SubscribeMessage (83, A, 61) SubscribeMessage (84, A, 62) SubscribeMessage (85, A, 63) SubscribeMessage (86, A, 64) SubscribeMessage (87, A, 65) SubscribeMessage (88, A, 66) SubscribeMessage (89, A, 67) SubscribeMessage (92, A, 68) SubscribeMessage (93, A, 69) SubscribeMessage (94, A, 70) SubscribeMessage (95, A, 71) SubscribeMessage (96, A, 72) SubscribeMessage (97, A, 73) SubscribeMessage (98, A, 74) SubscribeMessage (99, A, 75)
It can be seen that the packet loss rate of the received data is about twice that of the network card set by the TC。
According to the publisher and subscriber programs I provided, Wireshark can be used to capture and analyze data sending and receiving.
The following is the tc command
Here is a captured image of publisher:
Here is a captured image of subscriber:
I haven't had a chance yet to try it with two Linux boxes and I think I really need two and avoid ssh
ing into them over the interface used in the test because non-DDS other traffic will affect the observed loss. Looking at the info you provided:
HEARTBEAT
submessages are useless in this scenario, but generated because the writer defaults to "reliable".dds_write
call ends up generating a single datagram and ethernet frame (this is in line with the Wireshark screenshot)userID
field as a sequence number but it is also the key field. This means each sample is of a unique instance and the history depth is irrelevant on both the publisher and subscriber.dds_take
. With polling the data every approx. 1ms, on the face of it, this seems unlikely and the output also confirms that this is not happening (strictly monotonically increasing values for userID
).Looking at the Wireshark capture on the subscriber, what I find really odd is that the time stamps suggests most data samples are being received twice. That's got nothing to do with Cyclone, Wireshark is too low-level for that, so it looks like somewhere in your network stack packet duplication is occurring. Cyclone handles duplication just fine, but I find it odd, and it makes me wonder whether it has something to do with the high rate of dropped packets. In Wireshark you can easily take a look at the sequence numbers of the samples, that'd probably be a sensible thing to do.
The other thing that feels off[^1] is the loss distribution on the subscriber side. netem
documentation says that with loss 10%
"each packet loss is independent", but that doesn't specify the distribution. I find it surprisingly bursty ...
Finally, from the manpage of netem
I couldn't figure out if it drops on egress, ingress or both. This:
In addition, in the same network environment, the udp socket sending and receiving program written by myself has a packet loss rate of 10%, which meets expectations.
suggests it is not dropping on both sides, but from what I know of Cyclone's implementation and your source code it really can't be dropping anything internally if the involved machines are not ridiculously overloaded.
[^1]: No proper statistical test performed, and eyeballing statistics is asking for trouble.
yes,I could find that each received data packet is duplicated from Wireshark under packet loss environment,it‘s very odd. Perhaps it is precisely for this reason that the packet loss rate is twice that of the network environment. In addition,netem implements random packet loss transmission at the sending end of the network.
Hi friends:
The Linux TC tool sets the random packet loss rate of the sending network interface to 10%. In the experiment it is found that the subscription program loss rate of the helloworldexample of cyclonedds reached 20%. After Wireshark analysis, it was indeed a 10% packet loss rate when the publisher sent the package, but Wireshark often receives two identical packets of data, resulting in a subscriber packet loss rate of 20%. In addition, in the same network environment, the udp socket sending and receiving program written by myself has a packet loss rate of 10%, which meets expectations.