eclipse-cyclonedds / cyclonedds

Eclipse Cyclone DDS project
https://projects.eclipse.org/projects/iot.cyclonedds
Other
885 stars 362 forks source link

The message in the message queue will not be transmitted #1618

Closed caojiebao closed 1 year ago

caojiebao commented 1 year ago

https://github.com/eclipse-cyclonedds/cyclonedds/blob/64b062eddaa764d52ebe77503e8f100d6972dfe6/src/core/ddsi/src/ddsi_xmsg.c#L1328

caojiebao commented 1 year ago

1679107327857

caojiebao commented 1 year ago

The msg by helloword demo in best-effort mode will not be sent if without the second msg

eboasson commented 1 year ago

Ouch, that's bad! Sorry it got snowed under a bit ...

The msg by helloword demo in best-effort mode will not be sent if without the second msg

A second message or stopping the stack: it only becomes obvious when I add a sleep after the write but before it deletes the participant because the shutdown also forces the packet out. But yes.

The obvious fix is:

diff --git a/src/core/ddsi/src/ddsi_xmsg.c b/src/core/ddsi/src/ddsi_xmsg.c
index 815f8483a..d6e74eeca 100644
--- a/src/core/ddsi/src/ddsi_xmsg.c
+++ b/src/core/ddsi/src/ddsi_xmsg.c
@@ -1312,7 +1312,7 @@ void ddsi_xpack_send (struct ddsi_xpack *xp, bool immediately)
     ddsi_xpack_reinit (xp);
     xp1->sendq_next = NULL;
     ddsrt_mutex_lock (&gv->sendq_lock);
-    if (immediately || gv->sendq_length > SENDQ_LW)
+    if (immediately || gv->sendq_length == 0)
       ddsrt_cond_broadcast (&gv->sendq_cond);
     if (gv->sendq_length >= SENDQ_MAX)
     {

Looking at the code, however, I wonder about those SENDQ_LW and SENDQ_HW. The latter is not used anywhere, and the former is only used to unblock the ddsi_xpack_send if it is blocked on a full queue and I wonder if doesn't make more sense to unblock a bit before the queue is empty again. For example:

diff --git a/src/core/ddsi/src/ddsi_xmsg.c b/src/core/ddsi/src/ddsi_xmsg.c
index 815f8483a..6f035146b 100644
--- a/src/core/ddsi/src/ddsi_xmsg.c
+++ b/src/core/ddsi/src/ddsi_xmsg.c
@@ -1226,8 +1226,7 @@ static void ddsi_xpack_send_real (struct ddsi_xpack *xp)
 }

 #define SENDQ_MAX 200
-#define SENDQ_HW 10
-#define SENDQ_LW 0
+#define SENDQ_LW 100

 static uint32_t ddsi_xpack_sendq_thread (void *vgv)
 {
@@ -1312,7 +1311,7 @@ void ddsi_xpack_send (struct ddsi_xpack *xp, bool immediately)
     ddsi_xpack_reinit (xp);
     xp1->sendq_next = NULL;
     ddsrt_mutex_lock (&gv->sendq_lock);
-    if (immediately || gv->sendq_length > SENDQ_LW)
+    if (immediately || gv->sendq_length >= 0)
       ddsrt_cond_broadcast (&gv->sendq_cond);
     if (gv->sendq_length >= SENDQ_MAX)
     {

Arguably these values should be configurable, but if I make them configurable they still need to have a default.