eclipse-cyclonedds / cyclonedds

Eclipse Cyclone DDS project
https://projects.eclipse.org/projects/iot.cyclonedds
Other
891 stars 363 forks source link

Two processes with the same language can communicate, but two processes with two different languages ​​cannot communicate #2070

Closed yanzhang920817 closed 3 months ago

yanzhang920817 commented 3 months ago

I have implemented subscribers and publishers in C and Python. If the subscriber and publisher are in the same language, they can communicate normally, but if not, it doesn't work. As shown below, I am the publisher in C and the subscriber in Python.

publisher.c:

#include "common.h"
#include "gamepad_handler.h"
#include "dds/dds.h"
#include "dds_msg_struct.h"

int main(void)
{
        Gamepad gamepad;
    OpenGamepad(&gamepad);
    pthread_t thread_id;
#if 1
    int result = pthread_create(&thread_id, NULL, ReadGamepad, &gamepad);
    if (result != 0) {
        fprintf(stderr, "Error creating thread: %d\n", result);
        return EXIT_FAILURE;
    }
#endif
        // create dds publihser
    dds_entity_t participant;
    dds_entity_t topic;
    dds_entity_t writer;
    dds_return_t rc;
        DDS_MSG_STRUCT_Msg msg;
        uint32_t status = 0;

        /* Create a Participant. */
        participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
        if (participant < 0)
        DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));

        /* Create a Topic. */
        topic = dds_create_topic (
        participant, &DDS_MSG_STRUCT_Msg_desc, "HelloWorldData_Msg", NULL, NULL);
        if (topic < 0)
        DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));

        /* Create a Writer. */
        writer = dds_create_writer (participant, topic, NULL, NULL);
        if (writer < 0)
        DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-writer));

        printf("=== [Publisher]  Waiting for a reader to be discovered ...\n");
        fflush (stdout);

        rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
        if (rc != DDS_RETCODE_OK)
        DDS_FATAL("dds_set_status_mask: %s\n", dds_strretcode(-rc));

        while(!(status & DDS_PUBLICATION_MATCHED_STATUS))
        {
        rc = dds_get_status_changes (writer, &status);
        if (rc != DDS_RETCODE_OK)
        DDS_FATAL("dds_get_status_changes: %s\n", dds_strretcode(-rc));

        /* Polling sleep. */
        dds_sleepfor (DDS_MSECS (20));
        }

        /* Create a message to write. */
        msg.userID = 1;
//      msg.message = "Hello World";

    printf ("=== [Publisher]  Writing : ");
//    printf ("Message (%" PRId32 ", %s)\n", msg.userID, msg.message);
    fflush (stdout);
        // read Gamepad
    char gpMsg[40] = {0};
    //char gpMsg[40] = "12345678123456789123456789123456789";
#if 0
        for (int i = 0; i < 40; i++) {
        printf("0:0x%x,", gpMsg[i]);
    }
    printf("\n");
#endif
    while (1)
        {
        DealGamepadValue(&gamepad, gpMsg);
#if 1
                for (int i = 0; i < 40; i++) {
                printf("1:0x%x,", gpMsg[i]);
                        //gpMsg[i] = i;
        }
        printf("\n");
#endif
                //msg.message = (char*)gpMsg;
                //strcpy(msg.message, gpMsg);
                //msg.message = gpMsg;
                memcpy(msg.message, gpMsg, sizeof(gpMsg));
                //msg.message = "12345678012345678901234567890123456";
#if 1
                for (int i = 0; i < 40; i++) {
                printf("2:0x%x,", msg.message[i]);
        }
        printf("\n");
#endif
        SDL_Delay(10);
        //      sleep(1);
                //publish
                rc = dds_write (writer, &msg);
                if (rc != DDS_RETCODE_OK)
                DDS_FATAL("dds_write: %s\n", dds_strretcode(-rc));
                else
                        printf("write successfully!\n");
#if 0
                for (int i = 0; i < 40; i++) {
                printf("3:0x%x,", (unsigned char)msg.message[i]);
        }
        printf("\n");
#endif
        //      exit(-1);

   }

        /* Deleting the participant will delete all its children recursively as well. */
        rc = dds_delete (participant);
        if (rc != DDS_RETCODE_OK)
                DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
    // 等待线程结束
    pthread_join(thread_id, NULL);

    CloseGamepad(&gamepad);
    printf("Thread finished.\n");
        return 0;
}

cat dds_msg_struct.idl

module DDS_MSG_STRUCT
{
  struct Msg
  {
    @key
    long userID;
    char message[64];
  };
};

subscriber.py:

from dataclasses import dataclass
from cyclonedds.domain import DomainParticipant, Domain
from cyclonedds.sub import DataReader, Subscriber
from cyclonedds.topic import Topic
from cyclonedds.core import Listener, Qos, Policy
from cyclonedds.util import duration
from cyclonedds.idl import IdlStruct
import cyclonedds.idl.types as types
import cyclonedds.idl.annotations as annotate
import time

@dataclass
@annotate.final
@annotate.autoid("sequential")
class DDS_MSG_STRUCT(IdlStruct, typename="DDS_MSG_STRUCT::Msg"):
    userID: types.int32
    message: types.array[types.int8, 64]

class MyListener(Listener):
    def on_liveliness_changed(self, reader, status):
        print(">> Liveliness event")

listener = MyListener()
qos = Qos(
    Policy.Reliability.BestEffort,
    Policy.Deadline(duration(microseconds=10)),
    Policy.Durability.Transient,
    Policy.History.KeepLast(10)
)

# 创建 DomainParticipant
participant = DomainParticipant(0)

# 创建 Topic
tp = Topic(participant, "HelloWorldData_Msg", DDS_MSG_STRUCT, qos=qos)

# 创建 Subscriber
subscriber = Subscriber(participant)

# 创建 DataReader
reader = DataReader(subscriber, tp, listener=listener)

# 循环读取数据
print("\n=== [Subscriber] Waiting for a sample ...")

for sample in reader.take_iter(timeout=duration(seconds=2)):
    print(sample)

# 释放资源
del reader
del tp
del participant

cat cyclonedds.xml

<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/master/etc/cyclonedds.xsd">
    <Domain Id="0">
        <General>
            <Interfaces>
                <NetworkInterface name="enp2s0" priority="default" multicast="default" />
            </Interfaces>
            <AllowMulticast>true</AllowMulticast>
            <MaxMessageSize>65500B</MaxMessageSize>
            <DontRoute>true</DontRoute>
        </General>
        <Discovery>
            <EnableTopicDiscoveryEndpoints>true</EnableTopicDiscoveryEndpoints>
        </Discovery>
        <Internal>
            <Watermarks>
                <WhcHigh>500kB</WhcHigh>
            </Watermarks>
        </Internal>
        <Tracing>
            <Category>trace</Category>
            <Verbosity>finest</Verbosity>
            <OutputFile>/home/dobot/dds-cxx/log/dds.log.${CYCLONEDDS_PID}</OutputFile>
        </Tracing>
    </Domain>
</CycloneDDS>

a part of log:

1723804436.618827 [0]        tev: thread_cputime 0.002758000
1723804436.618955 [0]        tev: write_sample 110df15:8403a879:1199fba5:200c2 #9: ST0 DCPSParticipantMessage/ParticipantMessageData:{110df15:8403a879:1199fba5:1}:1<0>
1723804436.618996 [0]        tev: resched pmd(110df15:8403a879:1199fba5:1c1): 8s
1723804436.706883 [0]        tev: non-timed queue now has 1 items
1723804436.706956 [0]        tev: xmit spdp 110df15:8403a879:1199fba5:1c1 to 0:0:0:100c7 (resched 8s)
1723804436.706984 [0]        tev: xpack_addmsg 0x7f489c000b60 0x563e12aaac60 0(data(0:0:0:0:#0/1)): niov 0 sz 0 => now niov 3 sz 408
1723804436.707088 [0]        tev: nn_xpack_send 408: 0x7f489c000b6c:20 0x563e12ab7828:36 0x563e12ab2634:352 [ udp/239.255.0.1:7400@3 ]
1723804436.707174 [0]        tev: traffic-xmit (1) 408
1723804436.707178 [0]       recv: HDR(110df15:8403a879:1199fba5 vendor 1.16) len 408 from udp/192.168.5.1:54900
1723804436.707261 [0]       recv: INFOTS(1723804372.604405433)
1723804436.707307 [0]       recv: DATA(110df15:8403a879:1199fba5:100c2 -> 0:0:0:0 #1)
1723804436.707348 [0]       recv: thread_cputime 0.000346000
1723804436.707470 [0] dq.builtin: data(builtin, vendor 1.16): 0:0:0:0 #1: ST0 /ParticipantBuiltinTopicData:{property_list={1:"__ProcessName":"main",1:"__Pid":"100103",1:"__Hostname":"dobot-J6412"}:{},protocol_version=2:1,vendorid=1:16,participant_lease_duration=10000000000,participant_guid={110df15:8403a879:1199fba5:1c1},builtin_endpoint_set=805370943,domain_id=0,default_unicast_locator={udp/192.168.5.1:33215},default_multicast_locator={udp/239.255.0.1:7401},metatraffic_unicast_locator={udp/192.168.5.1:33215},metatraffic_multicast_locator={udp/239.255.0.1:7400},adlink_participant_version_info=0:44:0:0:0:"dobot-J6412/0.10.5/Linux/Linux",cyclone_receive_buffer_size=425984}
1723804436.707516 [0] dq.builtin: SPDP ST0 110df15:8403a879:1199fba5:1c1 (local)
1723804436.707537 [0] dq.builtin: thread_cputime 0.001554000
1723804444.619098 [0]        tev: thread_cputime 0.003460000
1723804444.619289 [0]        tev: write_sample 110df15:8403a879:1199fba5:200c2 #10: ST0 DCPSParticipantMessage/ParticipantMessageData:{110df15:8403a879:1199fba5:1}:1<0>
1723804444.619332 [0]        tev: resched pmd(110df15:8403a879:1199fba5:1c1): 8s
1723804444.707047 [0]        tev: non-timed queue now has 1 items
1723804444.707159 [0]        tev: xmit spdp 110df15:8403a879:1199fba5:1c1 to 0:0:0:100c7 (resched 8s)
1723804444.707190 [0]        tev: xpack_addmsg 0x7f489c000b60 0x563e12aaac60 0(data(0:0:0:0:#0/1)): niov 0 sz 0 => now niov 3 sz 408
1723804444.707303 [0]        tev: nn_xpack_send 408: 0x7f489c000b6c:20 0x563e12ab7828:36 0x563e12ab2634:352 [ udp/239.255.0.1:7400@3 ]
1723804444.707329 [0]        tev: traffic-xmit (1) 408
1723804444.707376 [0]       recv: HDR(110df15:8403a879:1199fba5 vendor 1.16) len 408 from udp/192.168.5.1:54900
1723804444.707414 [0]       recv: INFOTS(1723804372.604405433)
1723804444.707453 [0]       recv: DATA(110df15:8403a879:1199fba5:100c2 -> 0:0:0:0 #1)
1723804444.707479 [0]       recv: thread_cputime 0.000498000
1723804444.707564 [0] dq.builtin: data(builtin, vendor 1.16): 0:0:0:0 #1: ST0 /ParticipantBuiltinTopicData:{property_list={1:"__ProcessName":"main",1:"__Pid":"100103",1:"__Hostname":"dobot-J6412"}:{},protocol_version=2:1,vendorid=1:16,participant_lease_duration=10000000000,participant_guid={110df15:8403a879:1199fba5:1c1},builtin_endpoint_set=805370943,domain_id=0,default_unicast_locator={udp/192.168.5.1:33215},default_multicast_locator={udp/239.255.0.1:7401},metatraffic_unicast_locator={udp/192.168.5.1:33215},metatraffic_multicast_locator={udp/239.255.0.1:7400},adlink_participant_version_info=0:44:0:0:0:"dobot-J6412/0.10.5/Linux/Linux",cyclone_receive_buffer_size=425984}
1723804444.707657 [0] dq.builtin: SPDP ST0 110df15:8403a879:1199fba5:1c1 (local)
1723804444.707681 [0] dq.builtin: thread_cputime 0.001554000
eboasson commented 3 months ago

Your writer is created with the default QoS (both topic and writer have NULL for the QoS parameter, so the topic doesn't override anything and the writer just gets the default):

topic = dds_create_topic (participant, &DDS_MSG_STRUCT_Msg_desc, "HelloWorldData_Msg", NULL, NULL);
writer = dds_create_writer (participant, topic, NULL, NULL);

On the Python side:

qos = Qos(
    Policy.Reliability.BestEffort,
    Policy.Deadline(duration(microseconds=10)),
    Policy.Durability.Transient,
    Policy.History.KeepLast(10)
)
participant = DomainParticipant(0)
tp = Topic(particip_STRUCT, qos=qos)
subscriber = Subscriber(participant)
reader = DataReader(subscriber, tp, listener=listener)

The topic gets reliability = "best effort", deadline = 1µs, durability = "transient" and history = "keep last 10". The idea is that there is a all definition of topic T are the same in a domain, but it doesn't actually matter most of the time, because the reader and writer can overrule it anyway.

But then, the reader inherits the settings from the QoS (you can easily see them using cyclonedds ls --qos[^1]):

Deadline(deadline='10 microseconds')
Durability.Transient
History.KeepLast(depth=10)
Reliability.BestEffort

DDS has a QoS "matching" rule in place where some QoS's must be set to compatible values for the reader and writer to communicate. Among those are: "deadline", "durability" and "reliability". Having a "best-effort" reader is also fine, but a deadline of 10µs on the reader with a (default) deadline of ∞ on the writer won't talk. Similarly, transient vs volatile is also not going to work.

A 10µs deadline is going to be a problem in practice, it would require you to update the data at 100kHz minimum and that's tricky on standard operating systems. Cyclone doesn't support "transient" yet (it should reject it when try to use it, I guess, but it doesn't at the moment).

So the two are expected not to communicatie.

[^1]: You could also use the new insight tool, it even flags QoS incompatibilities.

yanzhang920817 commented 3 months ago

Your writer is created with the default QoS (both topic and writer have NULL for the QoS parameter, so the topic doesn't override anything and the writer just gets the default):

topic = dds_create_topic (participant, &DDS_MSG_STRUCT_Msg_desc, "HelloWorldData_Msg", NULL, NULL);
writer = dds_create_writer (participant, topic, NULL, NULL);

On the Python side:

qos = Qos(
    Policy.Reliability.BestEffort,
    Policy.Deadline(duration(microseconds=10)),
    Policy.Durability.Transient,
    Policy.History.KeepLast(10)
)
participant = DomainParticipant(0)
tp = Topic(particip_STRUCT, qos=qos)
subscriber = Subscriber(participant)
reader = DataReader(subscriber, tp, listener=listener)

The topic gets reliability = "best effort", deadline = 1µs, durability = "transient" and history = "keep last 10". The idea is that there is a all definition of topic T are the same in a domain, but it doesn't actually matter most of the time, because the reader and writer can overrule it anyway.

But then, the reader inherits the settings from the QoS (you can easily see them using cyclonedds ls --qos1):

Deadline(deadline='10 microseconds')
Durability.Transient
History.KeepLast(depth=10)
Reliability.BestEffort

DDS has a QoS "matching" rule in place where some QoS's must be set to compatible values for the reader and writer to communicate. Among those are: "deadline", "durability" and "reliability". Having a "best-effort" reader is also fine, but a deadline of 10µs on the reader with a (default) deadline of ∞ on the writer won't talk. Similarly, transient vs volatile is also not going to work.

A 10µs deadline is going to be a problem in practice, it would require you to update the data at 100kHz minimum and that's tricky on standard operating systems. Cyclone doesn't support "transient" yet (it should reject it when try to use it, I guess, but it doesn't at the moment).

So the two are expected not to communicatie.

Footnotes

  1. You could also use the new insight tool, it even flags QoS incompatibilities.

I have also observed that the c demo uses the default qos, but I have tried not setting qos or qos=Qos(), but it has no effect.

cat subsciber3.py

from dataclasses import dataclass
from cyclonedds.domain import DomainParticipant, Domain
from cyclonedds.sub import DataReader, Subscriber
from cyclonedds.topic import Topic
from cyclonedds.core import Listener, Qos, Policy
from cyclonedds.util import duration
from cyclonedds.idl import IdlStruct
import cyclonedds.idl.types as types
import cyclonedds.idl.annotations as annotate
import time

@dataclass
@annotate.final
@annotate.autoid("sequential")
class DDS_MSG_STRUCT(IdlStruct, typename="DDS_MSG_STRUCT::Msg"):
    userID: types.int32
    message: types.array[types.int8, 64]

class MyListener(Listener):
    def on_liveliness_changed(self, reader, status):
        print(">> Liveliness event")

listener = MyListener()

participant = DomainParticipant(0)

qos=Qos()
tp = Topic(participant, "HelloWorldData_Msg", DDS_MSG_STRUCT, qos)

subscriber = Subscriber(participant)

reader = DataReader(subscriber, tp, listener=listener)

print("\n=== [Subscriber] Waiting for a sample ...")

for sample in reader.take_iter(timeout=duration(seconds=2)):
    print(sample)

del reader
del tp
del participant
yanzhang920817 commented 3 months ago

dds.log.txt

yanzhang920817 commented 3 months ago

Thanks to eboasson's patient help, my problem was solved. It was because the message formats of Python and C were inconsistent, and the cyclonedds.xml was not used when the Python code was running.