eclipse-cyclonedds / cyclonedds-cxx

Other
96 stars 75 forks source link

Use shm or udp dynamically #353

Open YeahhhhLi opened 1 year ago

YeahhhhLi commented 1 year ago

The scenario where we use dds now is as follows: image

The usage of dds that I have learned so far:

  1. shm needs to be actively opened
  2. The message transmitted by shm requires a fixed length
  3. Qos policy between shm and udp is diff

So does cyclondds provide such a dynamic mechanism, or does it need to be adapted by our business side?

reicheratwork commented 1 year ago

Hi @YeahhhhLi , I don't completely understand your question, but let me try to expand on your points, and see whether I understand your assumptions.

To make use of shared memory functionality in CycloneDDS, you need to satisfy the following conditions:

  1. Have the shared memory provider (iceoryx) installed on the system
  2. Compile CycloneDDS (and CycloneDDS-CXX) with shared memory support (use the cmake flag -DENABLE_SHM=On)
  3. Assure that the datatype being exchanged is compatible with shared memory, it must be of a fixed size, and can therefore not contain any indirections to data such as sequences, strings, optionals, also it can only be of final extensibility
  4. Assure that the QoS settings for the topic are compatible with shared memory, some restrictions may be put on the reliability, durability, liveliness and history QoSPolicies
  5. Assure that the memory pool for iceoryx is configured correctly, you need to have enough chunks of the correct size (size of exchanged message +64 bytes for metadata) to assure smooth exchange
  6. Set the CycloneDDS configuration file to allow shared memory support
  7. Before starting the programs using CycloneDDS with shared memory, you need to start the iceoryx service (iox-roudi) to allow it to mediate the exchange

If all these conditions are satisfied, CycloneDDS will use shared memory for programs on the same node with shared memory support, and in all other cases, use "normal" (networked) exchange

For a more in-depth explanation, I refer you to the shared memory documentation on the CycloneDDS repository here

YeahhhhLi commented 1 year ago

Hi @YeahhhhLi , I don't completely understand your question, but let me try to expand on your points, and see whether I understand your assumptions.

To make use of shared memory functionality in CycloneDDS, you need to satisfy the following conditions:

  1. Have the shared memory provider (iceoryx) installed on the system
  2. Compile CycloneDDS (and CycloneDDS-CXX) with shared memory support (use the cmake flag -DENABLE_SHM=On)
  3. Assure that the datatype being exchanged is compatible with shared memory, it must be of a fixed size, and can therefore not contain any indirections to data such as sequences, strings, optionals, also it can only be of final extensibility
  4. Assure that the QoS settings for the topic are compatible with shared memory, some restrictions may be put on the reliability, durability, liveliness and history QoSPolicies
  5. Assure that the memory pool for iceoryx is configured correctly, you need to have enough chunks of the correct size (size of exchanged message +64 bytes for metadata) to assure smooth exchange
  6. Set the CycloneDDS configuration file to allow shared memory support
  7. Before starting the programs using CycloneDDS with shared memory, you need to start the iceoryx service (iox-roudi) to allow it to mediate the exchange

If all these conditions are satisfied, CycloneDDS will use shared memory for programs on the same node with shared memory support, and in all other cases, use "normal" (networked) exchange

For a more in-depth explanation, I refer you to the shared memory documentation on the CycloneDDS repository here

Thanks for reply.

The conditions you mentioned are all met in our system, and I also tested shm OK:

  1. datatype is fixed size.
  2. iceoryx config is correct
  3. i run iceoryx service, test cylonedds shm transport, and verify transport mode via tcpdump udp

And now in our system, what we want to do is the following:

  1. For the same topic, we have inter-process (hopefully shm) and inter-host requirements (only udp). Apart from using two types of topics to distinguish the two communication modes, is there a better solution?
  2. The actual length of the datatype of our different topics is somewhat different, some are as high as 5MB, and some may only be a few KB. When using shm, can we cover all topics with a fixed length(Like 8MB), or is there a better way?
YeahhhhLi commented 1 year ago

Hi @YeahhhhLi , I don't completely understand your question, but let me try to expand on your points, and see whether I understand your assumptions. To make use of shared memory functionality in CycloneDDS, you need to satisfy the following conditions:

  1. Have the shared memory provider (iceoryx) installed on the system
  2. Compile CycloneDDS (and CycloneDDS-CXX) with shared memory support (use the cmake flag -DENABLE_SHM=On)
  3. Assure that the datatype being exchanged is compatible with shared memory, it must be of a fixed size, and can therefore not contain any indirections to data such as sequences, strings, optionals, also it can only be of final extensibility
  4. Assure that the QoS settings for the topic are compatible with shared memory, some restrictions may be put on the reliability, durability, liveliness and history QoSPolicies
  5. Assure that the memory pool for iceoryx is configured correctly, you need to have enough chunks of the correct size (size of exchanged message +64 bytes for metadata) to assure smooth exchange
  6. Set the CycloneDDS configuration file to allow shared memory support
  7. Before starting the programs using CycloneDDS with shared memory, you need to start the iceoryx service (iox-roudi) to allow it to mediate the exchange

If all these conditions are satisfied, CycloneDDS will use shared memory for programs on the same node with shared memory support, and in all other cases, use "normal" (networked) exchange For a more in-depth explanation, I refer you to the shared memory documentation on the CycloneDDS repository here

Thanks for reply.

The conditions you mentioned are all met in our system, and I also tested shm OK:

  1. datatype is fixed size.
  2. iceoryx config is correct
  3. i run iceoryx service, test cylonedds shm transport, and verify transport mode via tcpdump udp

And now in our system, what we want to do is the following:

  1. For the same topic, we have inter-process (hopefully shm) and inter-host requirements (only udp). Apart from using two types of topics to distinguish the two communication modes, is there a better solution?
  2. The actual length of the datatype of our different topics is somewhat different, some are as high as 5MB, and some may only be a few KB. When using shm, can we cover all topics with a fixed length(Like 8MB), or is there a better way?

Regarding the second point I mentioned, I see that in the cyclonedds shmthroughput example, topic_types of different sizes(DataType_16 ~ DataType_1048576) will be allocated according to the payload size of the input parameter.

But in our system, there will be multiple topics, and the size of the transmitted data in each topic is not the same. It is obviously unreasonable to set different datatypes according to the data size when creating different topics. . So, do we support choosing the shm block size according to the actual size of the datatype during the actual use of shm transmission?

For example:

struct DDSMessage { // max size : 10M
    uint32 body_size;
    char body[10485760 - 4];
}

Cyclonedds can determine the actual data size through the body_size in the datatype, so that the user does not need to create multiple DataTypes and then make their own judgments

reicheratwork commented 1 year ago

There is still something not very clear to me, so let me try to summarize what I think your desired end goal is, and to which degree you have already reached this goal.

And now in our system, what we want to do is the following:

  1. For the same topic, we have inter-process (hopefully shm) and inter-host requirements (only udp). Apart from using two types of topics to distinguish the two communication modes, is there a better solution?

Referencing the diagram you included at your first post:

  1. The actual length of the datatype of our different topics is somewhat different, some are as high as 5MB, and some may only be a few KB. When using shm, can we cover all topics with a fixed length(Like 8MB), or is there a better way?

When using shared memory, you can just use a single datatype which is sufficient for all possible scenarios, like you proposed. However this has a number of disadvantages:

The most important question is (in my opinion): do you need to have a flexible sized datatype in your topic? Because, if you know the (maximum) size of the exchanged message at topic creation time, but not compile time, and this is the same on all processes that are communicating, you can choose which of the predefined datatypes (mirroring the approach in shmthroughput example) that the exchanged data "fits into".

YeahhhhLi commented 1 year ago

Hi. Or maybe what I'm describing is a bit confusing, I will try to explain again

If you want to have processes A1, A2 and B1 all exchanging data on the same Topic, in my opinion, you are already there. CycloneDDS will determine which form of communication to use (shm for intra-node exchange, udp for inter-node)

Yes, I want to have processes A1, A2 and B1 all exchanging data on the same Topic, but I didn't solve the problem of coexistence of shm and udp, I just realized two communication modes respectively:

For SHM mode:

  1. IDL datatype
    struct DDSMessage {
    uint32 header_size;
    char header[131072 - 4]; // 128K
    uint32 body_size;
    char body[5242880 - 4];  // 5M
    }
  2. iox config
    
    [general]
    version = 1

[[segment]]

[[segment.mempool]] size = 5374016 // 5M +128K + 64B count = 1024

3. cyclondds config
```config
<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/iceoryx/etc/cyclonedds.xsd">
    <Domain id="any">
        <SharedMemory>
            <Enable>true</Enable>
            <LogLevel>info</LogLevel>
        </SharedMemory>
    </Domain>
</CycloneDDS>
  1. pub c++

    dds::domain::DomainParticipant participant(domain::default_id());
    
    dds::topic::Topic<DDSMessage> topic(participant, "ShmThroughput");
    
    dds::pub::qos::PublisherQos pb_qos;
    pb_qos.policy(dds::core::policy::Partition("Throughput example"));
    dds::pub::Publisher publisher(participant, pb_qos);
    
    dds::pub::qos::DataWriterQos dw_qos;
    dw_qos.policy(dds::core::policy::Reliability(dds::core::policy::ReliabilityKind::RELIABLE, dds::core::Duration::from_secs(10)));
    dw_qos.policy(dds::core::policy::History(dds::core::policy::HistoryKind::KEEP_LAST, 16));
    dw_qos.policy(dds::core::policy::Deadline()); // default inifinite
    dw_qos.policy(dds::core::policy::Durability(dds::core::policy::DurabilityKind::VOLATILE));
    dw_qos.policy(dds::core::policy::Liveliness(dds::core::policy::LivelinessKind::AUTOMATIC, dds::core::Duration::from_secs(1)));
    dw_qos.policy(dds::core::policy::ResourceLimits(100, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED));
    dds::pub::DataWriter<DDSMessage> writer(publisher, topic, dw_qos);
    if (!writer.delegate()->is_loan_supported()) {
        std::cout << "Check loan supported failed, please check if the size of topic type is fixed and iox service is normal" << std::endl;
        return EXIT_FAILURE;
    }
    
    std::cout << "=== [Publisher] Waiting for subscriber." << std::endl;
    while (writer.publication_matched_status().current_count() == 0) {
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }
    while (true) {
        DDSMessage& msg = writer.delegate()->loan_sample();
        // fill msg
        writer.write(std::move(msg));
    }

For UDP mode:

  1. idl datatype
    struct DDSMessage {
    string header;
    string body;
    };
  2. pub C++

    dds::domain::DomainParticipant participant(domain::default_id());
    /* To publish something, a topic is needed. */
    dds::topic::Topic<DDSMessage> topic(participant, "Throughput");
    /* A writer also needs a publisher. */
    dds::pub::qos::PublisherQos pb_qos;
    pb_qos.policy(dds::core::policy::Partition("Throughput example"));
    dds::pub::Publisher publisher(participant, pb_qos);
    
    dds::pub::qos::DataWriterQos dw_qos;
    dw_qos.policy(dds::core::policy::Reliability(dds::core::policy::ReliabilityKind::RELIABLE, dds::core::Duration::from_secs(10)));
    dw_qos.policy(dds::core::policy::History(dds::core::policy::HistoryKind::KEEP_ALL, 0));
    dw_qos.policy(dds::core::policy::ResourceLimits(100, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED));
    dds::pub::DataWriter<DDSMessage> writer(publisher, topic, dw_qos);
     while (writer.publication_matched_status().current_count() == 0) {
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }
    while (true) {
        DDSMessage msg;
        // fill msg
       writer.write(msg);
    }

    From the realization of the above two modes, there are several differences:

  3. shm needs iox service and config, udp does not
  4. shm needs cyclonedds enable SharedMemory, udp does not
  5. The two communication modes require different qos configurations
  6. shm needs loan_sample from writer, udp does not

    This is how I know how to use shm and udp so far. So, back to the original question.

    Yes, I want to have processes A1, A2 and B1 all exchanging data on the same Topic

How should I achieve this goal, is there any aspect that I am missing in the use of cyclonedds-cxx?

reicheratwork commented 1 year ago

@YeahhhhLi

I quickly made this example program, to show how to "automatically fit" runtime defined numbers of points into the smallest fixed size types. I hope it is of use to you. You start both publisher and subscriber with as argument the number of points you want each message to contain (up to 64 in this case) or no arguments (in which case it defaults to 8 points per sample).

messages.idl:

module messages {
  struct LIDARPoint {
    @key UInt64 point_ID;
    double x, y, z;
  };

  struct LIDARFrame_8 {
    @key UInt64 frame_ID;
    UInt64 timestamp;
    UInt32 npoints;
    LIDARPoint points[8];
  };

  struct LIDARFrame_16 {
    @key UInt64 frame_ID;
    UInt64 timestamp;
    UInt32 npoints;
    LIDARPoint points[16];
  };

  struct LIDARFrame_32 {
    @key UInt64 frame_ID;
    UInt64 timestamp;
    UInt32 npoints;
    LIDARPoint points[32];
  };

  struct LIDARFrame_64 {
    @key UInt64 frame_ID;
    UInt64 timestamp;
    UInt32 npoints;
    LIDARPoint points[64];
  };
};

subscriber.cpp:

#include <cstdlib>
#include <iostream>
#include <chrono>
#include <thread>

#include "dds/dds.hpp"

#include "messages.hpp"

using namespace org::eclipse::cyclonedds;

template<typename T>
int read_func(size_t N)
{
  dds::domain::DomainParticipant participant(domain::default_id());
  dds::topic::Topic<T> topic(participant, "topic_name");
  dds::sub::Subscriber subscriber(participant);
  dds::sub::DataReader<T> reader(subscriber, topic);

  std::cout << "=== [Subscriber] N: " << N << "." << std::endl;

  std::cout << "=== [Subscriber] Waiting for publisher." << std::endl;
  while (reader.subscription_matched_status().current_count() == 0) {
      std::this_thread::sleep_for(std::chrono::milliseconds(20));
  }
  std::cout << "=== [Subscriber] Waiting for messages." << std::endl;

  while (reader.subscription_matched_status().current_count() > 0) {
    auto samples = reader.take();
    for (const auto & sample:samples) {
      if (sample.info().valid()) {
          const T &msg = sample.data();
          std::cout << "=== [Subscriber] Message received:" << std::endl;
          std::cout << "=== [Subscriber] Frame ID: " << msg.frame_ID() << std::endl;
          std::cout << "=== [Subscriber] Timestamp: " << msg.timestamp() << std::endl;
          for (size_t p = 0; p < msg.npoints(); p++) {
            std::cout << "=== [Subscriber] Point: " << msg.points()[p].point_ID();
            std::cout <<" (" << msg.points()[p].x() << ", " << msg.points()[p].y() << ", ";
            std::cout << msg.points()[p].z() << ")" << std::endl;
          }
      } else {
        std::cout << "=== [Subscriber] Invalid message." << std::endl;
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
  }

  return 0;
}

int main(int argc, char **argv) {
    try {
        size_t N = argc > 1 ? std::stoul(argv[1]) : 8;
        if (N <= 8)
          read_func<messages::LIDARFrame_8>(N);
        else if (N <= 16)
          read_func<messages::LIDARFrame_16>(N);
        else if (N <= 32)
          read_func<messages::LIDARFrame_32>(N);
        else if (N <= 64)
          read_func<messages::LIDARFrame_64>(N);
        else
          std::cerr << N << " is not a supported number of lidar points" << std::endl;
    } catch (const dds::core::Exception& e) {
        std::cerr << "=== [Subscriber] Exception: " << e.what() << std::endl;
        return EXIT_FAILURE;
    } catch (...) {
        std::cerr << "=== [Subscriber] Exception.\n" << std::endl;
        return EXIT_FAILURE;
    }

    std::cout << "=== [Subscriber] Done." << std::endl;

    return EXIT_SUCCESS;
}

publisher.cpp:

#include <cstdlib>
#include <iostream>
#include <chrono>
#include <thread>
#include <type_traits>

#include "dds/dds.hpp"

#include "messages.hpp"

using namespace org::eclipse::cyclonedds;

template<typename T>
int write_func(size_t N)
{
  dds::domain::DomainParticipant participant(domain::default_id());
  dds::topic::Topic<T> topic(participant, "topic_name");
  dds::pub::Publisher publisher(participant);
  dds::pub::DataWriter<T> writer(publisher, topic);

  std::cout << "=== [Publisher] N: " << N << "." << std::endl;

  std::cout << "=== [Publisher] Waiting for subscriber." << std::endl;
  while (writer.publication_matched_status().current_count() == 0) {
      std::this_thread::sleep_for(std::chrono::milliseconds(20));
  }

  std::cout << "=== [Publisher] Starting writes." << std::endl;
  T msg;
  msg.npoints() = static_cast<uint32_t>(N);
  while (writer.publication_matched_status().current_count() > 0) {
    msg.frame_ID()++;
    for (size_t p = 0; p < N; p++) {
      msg.points()[p].point_ID() = p;
      msg.points()[p].x() = rand();
      msg.points()[p].y() = rand();
      msg.points()[p].z() = rand();
    }
    writer.write(msg);
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
  }

  return 0;
}

int main(int argc, char **argv) {
    try {
        size_t N = argc > 1 ? std::stoul(argv[1]) : 8;
        if (N <= 8)
          write_func<messages::LIDARFrame_8>(N);
        else if (N <= 16)
          write_func<messages::LIDARFrame_16>(N);
        else if (N <= 32)
          write_func<messages::LIDARFrame_32>(N);
        else if (N <= 64)
          write_func<messages::LIDARFrame_64>(N);
        else
          std::cerr << "=== [Publisher] " << N << " is not a supported number of lidar points" << std::endl;
    } catch (const dds::core::Exception& e) {
        std::cerr << "=== [Publisher] Exception: " << e.what() << std::endl;
        return EXIT_FAILURE;
    } catch (...) {
        std::cerr << "=== [Publisher] Exception.\n" << std::endl;
        return EXIT_FAILURE;
    }

    std::cout << "=== [Publisher] Done." << std::endl;

    return EXIT_SUCCESS;
}

You will still need to add the correct QoS settings for your use case (and SHM use).

reicheratwork commented 1 year ago

Hi @YeahhhhLi , I think there may be something else going on here. Looking at what you have written:

  1. shm needs iox service and config, udp does not
  2. shm needs cyclonedds enable SharedMemory, udp does not
  3. The two communication modes require different qos configurations
  4. shm needs loan_sample from writer, udp does not

Is the issue that your UDP processes are not communicating with the SHM processes? Because if that is the case, they might not be matching due to the differences in QoS settings (your point 3). If the QoS settings are not "compatible" (whose criteria depend on the QoSPolicies being compared), reader and writer will not match, and therefore not communicate.

YeahhhhLi commented 1 year ago

The most important question is (in my opinion): do you need to have a flexible sized datatype in your topic? Because, if you know the (maximum) size of the exchanged message at topic creation time, but not compile time, and this is the same on all processes that are communicating, you can choose which of the predefined datatypes (mirroring the approach in shmthroughput example) that the exchanged data "fits into".

In our system, the data transmitted by different topics are actually different protobufs. We will unify them into the DataType of DDS through SerializeToString/Arrary, which is the header + body mentioned above: image

YeahhhhLi commented 1 year ago

Hi @YeahhhhLi , I think there may be something else going on here. Looking at what you have written:

  1. shm needs iox service and config, udp does not
  2. shm needs cyclonedds enable SharedMemory, udp does not
  3. The two communication modes require different qos configurations
  4. shm needs loan_sample from writer, udp does not

Is the issue that your UDP processes are not communicating with the SHM processes? Because if that is the case, they might not be matching due to the differences in QoS settings (your point 3). If the QoS settings are not "compatible" (whose criteria depend on the QoSPolicies being compared), reader and writer will not match, and therefore not communicate.

yes, but udp process and shm process are not accurate. In fact, we only have one process, and it may be deployed on two hosts in the future. At that time, both inter-process communication and cross-host communication are required.

YeahhhhLi commented 1 year ago

I quickly made this example program, to show how to "automatically fit" runtime defined numbers of points into the smallest fixed size types. I hope it is of use to you. You start both publisher and subscriber with as argument the number of points you want each message to contain (up to 64 in this case) or no arguments (in which case it defaults to 8 points per sample).

Thanks First! I will look at the example you provided in detail

reicheratwork commented 1 year ago

Thanks First! I will look at the example you provided in detail

No problem @YeahhhhLi

Another thing, for some time it is no longer mandatory to have a fixed size datatype for data exchange over SHM, as variable sized datatypes will be serialized into the exchanged SHM blocks. This will incur a performance overhead and fixed size types are the fastest/cheapest for intra-node communication. If you also need to use the same process to communicate through the network on the same topic, then serialization will need to happen anyhow, and the only penalty is the deserialization step on the receiving processes on the same node.

YeahhhhLi commented 1 year ago

In fact, we only have one process, and it may be deployed on two hosts in the future. At that time, both inter-process communication and cross-host communication are required.

In this case, can only shm and udp, reader / writer be created at the same time? Like:

  dds::domain::DomainParticipant participant(domain::default_id());
  dds::pub::Publisher publisher(participant, pb_qos);

  dds::topic::Topic<ShmDDSMessage> shm_topic(participant, topic_name);
  dds::pub::DataWriter<ShmDDSMessage> shm_writer(publisher, shm_topic, shm_dw_qos);

  dds::topic::Topic<UDPDDSMessage> udp_topic(participant, topic_name);
  dds::pub::DataWriter<UDPDDSMessage>udp_writer(publisher, udp_topic,udp_dw_qos);

  // when need transport
  // shm
  {
    ShmDDSMessage& dds_message = shm_writer->deletegate().loan_sample();
    // fill message
   shm_writer->write(dds_message);
  }
  // udp
  {
    UDPDDSMessage udp_message;
   // fill message
   udp_writer->write(udp_message);
  }
reicheratwork commented 1 year ago

In this case, can only shm and udp, reader / writer be created at the same time?

A topic's name needs to be unique on the domain it is created on, so your example will not work.

Also, in the case of the different processes on the same node both writing to UDP and SHM, kind of defeats the purpose of using SHM, no?

But I don't know enough about your specific use case to say anything definitive.

YeahhhhLi commented 1 year ago

A topic's name needs to be unique on the domain it is created on, so your example will not work.

lol yes u r right, i missed this, let's add the suffix to the real topic_name

kind of defeats the purpose of using SHM, no?

There will be such a scenario, when my service publishes a message, maybe other processes are subscribing to this topic data. It is also possible that a process on another host will also subscribe to the data.

So for the former (interprocess), we hope to take advantage of shm, For the latter (interhost), we can only use udp. Is this consistent with cognition? After all, for MB-level data transmission, shm is still significantly better than udp

There is another optimization point here, whether it is possible to use the service discovery mechanism to determine whether inter-host communication is required. If it is determined that all participants are on the same host, perhaps shm is sufficient to meet the requirements (It's a feasible idea, right?

reicheratwork commented 1 year ago

A topic's name needs to be unique on the domain it is created on, so your example will not work.

lol yes u r right, i missed this, let's add the suffix to the real topic_name

I may have been wrong in making this statement that strongly, I may have to ask another colleague whether this is completely true.

But that does not take away from the fact that it is a bad idea to have topics with different datatypes but the same name.

kind of defeats the purpose of using SHM, no?

There will be such a scenario, when my service publishes a message, maybe other processes are subscribing to this topic data. It is also possible that a process on another host will also subscribe to the data.

So for the former (interprocess), we hope to take advantage of shm, For the latter (interhost), we can only use udp. Is this consistent with cognition? After all, for MB-level data transmission, shm is still significantly better than udp

There is another optimization point here, whether it is possible to use the service discovery mechanism to determine whether inter-host communication is required. If it is determined that all participants are on the same host, perhaps shm is sufficient to meet the requirements (It's a feasible idea, right?

The way in which I understand CycloneDDS works, there are levels that are gone through before communication between readers and writers is done. Discovery:

Data writing:

As long as you have a Datatype/container which is sufficiently suitable for your needs you need just a single topic to reach nodes through both SHM and UDP

Maybe using some non-fixed size datatypes would be a better solution in your case then, as this will allow you to use a single topic to exchange all possible inputs, though this does incur the performance penalty of always serializing the written data, even when just writing to SHM

YeahhhhLi commented 1 year ago

@reicheratwork Thank you for your patience

I will try to test again !

reicheratwork commented 1 year ago

@YeahhhhLi , Looking at the QoS settings in your example:

SHM:

dds::pub::qos::DataWriterQos dw_qos;
dw_qos.policy(dds::core::policy::Reliability(dds::core::policy::ReliabilityKind::RELIABLE, dds::core::Duration::from_secs(10)));
dw_qos.policy(dds::core::policy::History(dds::core::policy::HistoryKind::KEEP_LAST, 16));
dw_qos.policy(dds::core::policy::Deadline()); // default inifinite
dw_qos.policy(dds::core::policy::Durability(dds::core::policy::DurabilityKind::VOLATILE));
dw_qos.policy(dds::core::policy::Liveliness(dds::core::policy::LivelinessKind::AUTOMATIC, dds::core::Duration::from_secs(1)));
dw_qos.policy(dds::core::policy::ResourceLimits(100, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED));

UDP:

dds::pub::qos::DataWriterQos dw_qos;
dw_qos.policy(dds::core::policy::Reliability(dds::core::policy::ReliabilityKind::RELIABLE, dds::core::Duration::from_secs(10)));
dw_qos.policy(dds::core::policy::History(dds::core::policy::HistoryKind::KEEP_ALL, 0));
dw_qos.policy(dds::core::policy::ResourceLimits(100, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED));

Looking at this, setting the historykind to KEEP_ALL will preclude the use of SHM. Wouldn't it be better to have the same QoS settings for the publisher program, and enable/disable SHM use through the config file?

For reference, the following QoS settings are required to allow the use of SHM:

For both readers and writers:

Additionally for writers:

Additionally for readers:

YeahhhhLi commented 1 year ago

Maybe using some non-fixed size datatypes would be a better solution in your case then, as this will allow you to use a single topic to exchange all possible inputs, though this does incur the performance penalty of always serializing the written data, even when just writing to SHM

By the way, which version supports the shm communication of data with no fixed length? The versions I currently depend on are: 1、cyclonedds-cxx 0.9.2 2、cyclondds 0.9.2 3、iceoryx 2.0.2

I looked at the master version of cyclonedds, dds_is_loan_available still has a judgment on fix_size

reicheratwork commented 1 year ago

By the way, which version supports the shm communication of data with no fixed length? The versions I currently depend on are: 1、cyclonedds-cxx 0.9.2 2、cyclondds 0.9.2 3、iceoryx 2.0.2

The current version of master does support serialization of variable length datatypes. By looking at the dds_write_impl function (called by dds_write), which calls dds_write_impl_iox when the writer has an iceoryx publisher attached, which then uses iceoryx for local nodes with SHM enabled. Though I do not know from which version this functionality was started.

I looked at the master version of cyclonedds, dds_is_loan_available still has a judgment on fix_size

Correct, though not having a loan does not mean you cannot use shared memory, it just involves an additional copy step. You can just call dds_write write non-loaned samples, which then makes a copy/serializes into shared memory and publishes it to iceoryx :)

YeahhhhLi commented 1 year ago

looking at the dds_write_impl function (called by dds_write)

OK, I'll take a look in a while

an additional copy step.

This does introduce additional overhead, iceoryx's zero-copy feature is really cool, I don't really want to give it up.

Is there any reference information, how big is the performance difference between the fixed-length shm and the non-fixed-length shm for large data (eg 1MB)? If not, I'll test it myself again when I have time

YeahhhhLi commented 1 year ago

Looking at this, setting the historykind to KEEP_ALL will preclude the use of SHM. Wouldn't it be better to have the same QoS settings for the publisher program, and enable/disable SHM use through the config file?

Thanks for the reminder! After using the QOS configuration of SHM for testing, the test results are indeed in line with expectations: 1、On Host A, I start publisher with enabling shm; 2、On Host A, I start subsciber with enabling shm; 3、On Host B, I start subsciber with enabling shm;

The results show that the subscribers of host A and host B can receive data, the former through shm, the latter through udp!

reicheratwork commented 1 year ago

This does introduce additional overhead, iceoryx's zero-copy feature is really cool, I don't really want to give it up.

Very good attitude to have :P

Is there any reference information, how big is the performance difference between the fixed-length shm and the non-fixed-length shm for large data (eg 1MB)? If not, I'll test it myself again when I have time

It has been a while since I have been into that part of the functionality, so forgive me if I don't have the exact numbers on hand. Though looking at the differences in performance between networked and shared memory exchange, as shown in the documentation here you can expect something between the two different lines for bandwidth. The exact difference would depend on a number of factors, such as how large a part is taken up by just copying memory, and how large the part of the network transfer is.

reicheratwork commented 1 year ago

The results show that the subscribers of host A and host B can receive data, the former through shm, the latter through udp!

Very nice!

YeahhhhLi commented 1 year ago

so forgive me if I don't have the exact numbers on hand. Though looking at the differences in performance between networked and shared memory exchange, as shown in the documentation here you can expect something between the two different lines for bandwidth

OK, let me try to test

YeahhhhLi commented 1 year ago

so forgive me if I don't have the exact numbers on hand. Though looking at the differences in performance between networked and shared memory exchange, as shown in the documentation here you can expect something between the two different lines for bandwidth

OK, let me try to test

Made a simple test, the performance gap between the two is 2-3 times.

1、Message IDL 【fixed-size】

   struct Msg
  {
    uint64 send_ts_ns;
    uint32 payload_size;
    char payload[5242880 - 20];
  };

【non finxed-size】

   struct Msg
  {
    uint64 send_ts_ns;
    string payload;
  };

2、3MB message size 【fixed-size】

[Subscriber] Interval[1.01 s] Samples[804.10 counts] MsgSize[3145728.00 bytes] Speed[19298.42 Mbits/s] avg_cost[802.54 us] pct50_cost[799.00 us] pct90_cost[1435.00 us] max_cost[1963.00 us]
[Subscriber] Interval[1.01 s] Samples[810.79 counts] MsgSize[3145728.00 bytes] Speed[19458.87 Mbits/s] avg_cost[791.75 us] pct50_cost[790.00 us] pct90_cost[1425.00 us] max_cost[1691.00 us]
[Subscriber] Interval[1.01 s] Samples[803.74 counts] MsgSize[3145728.00 bytes] Speed[19289.85 Mbits/s] avg_cost[777.72 us] pct50_cost[785.00 us] pct90_cost[1397.00 us] max_cost[1734.00 us]
[Subscriber] Interval[1.01 s] Samples[803.08 counts] MsgSize[3145728.00 bytes] Speed[19273.88 Mbits/s] avg_cost[793.53 us] pct50_cost[797.00 us] pct90_cost[1414.00 us] max_cost[1697.00 us]
[Subscriber] Interval[1.01 s] Samples[805.94 counts] MsgSize[3145728.00 bytes] Speed[19342.53 Mbits/s] avg_cost[781.77 us] pct50_cost[780.00 us] pct90_cost[1404.00 us] max_cost[1928.00 us]

【non fixed-size】

[Subscriber] Interval[1.00 s] Samples[477.21 counts] MsgSize[3145728.00 bytes] Speed[11453.02 Mbits/s] avg_cost[2150.02 us] pct50_cost[2127.00 us] pct90_cost[2275.00 us] max_cost[2572.00 us]
[Subscriber] Interval[1.00 s] Samples[490.37 counts] MsgSize[3145728.00 bytes] Speed[11768.83 Mbits/s] avg_cost[2110.44 us] pct50_cost[2088.00 us] pct90_cost[2205.00 us] max_cost[2745.00 us]
[Subscriber] Interval[1.00 s] Samples[480.89 counts] MsgSize[3145728.00 bytes] Speed[11541.36 Mbits/s] avg_cost[2060.55 us] pct50_cost[2044.00 us] pct90_cost[2152.00 us] max_cost[2438.00 us]
[Subscriber] Interval[1.00 s] Samples[479.39 counts] MsgSize[3145728.00 bytes] Speed[11505.24 Mbits/s] avg_cost[2049.56 us] pct50_cost[2023.00 us] pct90_cost[2145.00 us] max_cost[2882.00 us]
[Subscriber] Interval[1.00 s] Samples[469.82 counts] MsgSize[3145728.00 bytes] Speed[11275.57 Mbits/s] avg_cost[2199.04 us] pct50_cost[2169.00 us] pct90_cost[2337.00 us] max_cost[2681.00 us]
[Subscriber] Interval[1.00 s] Samples[482.13 counts] MsgSize[3145728.00 bytes] Speed[11571.14 Mbits/s] avg_cost[2155.52 us] pct50_cost[2118.00 us] pct90_cost[2320.00 us] max_cost[2684.00 us]

3、5MB message size 【fixed-size】

[Subscriber] Interval[1.01 s] Samples[799.06 counts] MsgSize[5242880.00 bytes] Speed[31962.45 Mbits/s] avg_cost[1424.79 us] pct50_cost[1424.00 us] pct90_cost[2542.00 us] max_cost[3309.00 us]
[Subscriber] Interval[1.01 s] Samples[800.92 counts] MsgSize[5242880.00 bytes] Speed[32036.85 Mbits/s] avg_cost[1402.06 us] pct50_cost[1398.00 us] pct90_cost[2511.00 us] max_cost[3409.00 us]
[Subscriber] Interval[1.01 s] Samples[803.04 counts] MsgSize[5242880.00 bytes] Speed[32121.69 Mbits/s] avg_cost[1444.89 us] pct50_cost[1442.00 us] pct90_cost[2606.00 us] max_cost[3934.00 us]
[Subscriber] Interval[1.01 s] Samples[798.73 counts] MsgSize[5242880.00 bytes] Speed[31949.13 Mbits/s] avg_cost[1407.13 us] pct50_cost[1419.00 us] pct90_cost[2520.00 us] max_cost[3211.00 us]

【non fixed-size】

[Subscriber] Interval[1.00 s] Samples[292.14 counts] MsgSize[5242880.00 bytes] Speed[11685.62 Mbits/s] avg_cost[3498.07 us] pct50_cost[3470.00 us] pct90_cost[3600.00 us] max_cost[4276.00 us]
[Subscriber] Interval[1.00 s] Samples[291.75 counts] MsgSize[5242880.00 bytes] Speed[11670.11 Mbits/s] avg_cost[3487.08 us] pct50_cost[3469.00 us] pct90_cost[3562.00 us] max_cost[4079.00 us]
[Subscriber] Interval[1.00 s] Samples[292.46 counts] MsgSize[5242880.00 bytes] Speed[11698.24 Mbits/s] avg_cost[3490.67 us] pct50_cost[3467.00 us] pct90_cost[3570.00 us] max_cost[4132.00 us]
[Subscriber] Interval[1.00 s] Samples[293.50 counts] MsgSize[5242880.00 bytes] Speed[11739.99 Mbits/s] avg_cost[3476.33 us] pct50_cost[3458.00 us] pct90_cost[3550.00 us] max_cost[4216.00 us]
[Subscriber] Interval[1.00 s] Samples[290.75 counts] MsgSize[5242880.00 bytes] Speed[11629.97 Mbits/s] avg_cost[3517.75 us] pct50_cost[3498.00 us] pct90_cost[3613.00 us] max_cost[4045.00 us]
YeahhhhLi commented 1 year ago

it just involves an additional copy step

In addition, I also want to ask, will the additional copies here increase with the increase in the number of readers?

YeahhhhLi commented 1 year ago

There is also a question about qos configuration, how can I use cyclonedds.xml to configure the qos of data_writer and data_reader? I didn't find a related demo,

Now, my cyclonedds xml is:

<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/iceoryx/etc/cyclonedds.xsd">
    <Domain id="any">
        <SharedMemory>
            <Enable>true</Enable>
        </SharedMemory>
    </Domain>
</CycloneDDS>

And, i want to add data writer qos:

    dw_qos.policy(dds::core::policy::Reliability(dds::core::policy::ReliabilityKind::RELIABLE, dds::core::Duration::from_secs(10)));
    dw_qos.policy(dds::core::policy::History(dds::core::policy::HistoryKind::KEEP_LAST, 16));
    dw_qos.policy(dds::core::policy::Deadline()); // default inifinite
    dw_qos.policy(dds::core::policy::Durability(dds::core::policy::DurabilityKind::VOLATILE));
    dw_qos.policy(dds::core::policy::Liveliness(dds::core::policy::LivelinessKind::AUTOMATIC, dds::core::Duration::from_secs(1)));
    dw_qos.policy(dds::core::policy::ResourceLimits(MAX_SAMPLES, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED));
reicheratwork commented 1 year ago

@YeahhhhLi looking at the following (summarized your results):

2、3MB message size 【fixed-size】 ~20Gb/s 【non fixed-size】 ~11.5Gb/s 3、5MB message size 【fixed-size】 ~32Gb/s 【non fixed-size】 ~11.5Gb/s

It seems that with the non-fixed size you are running into a bottleneck of around 11-12 Gb/s, and I think it is caused by the serialization (so memcpy) step.

it just involves an additional copy step

In addition, I also want to ask, will the additional copies here increase with the increase in the number of readers?

For fixed-size datatypes (so no serialization is necessary across iceoryx) each process's reader just gets the pointer to the sample, and the metadata is the only thing being copied (maybe 32-64 bytes/sample or so). For non fixed-size datatypes (so serialization is necessary across iceoryx) each process's reader will need to do its own deserialization.

YeahhhhLi commented 1 year ago

OK thanks~

each process's reader will need to do its own deserialization

Regarding the serialization/deserialization part, is there any relevant document to learn, or which source code is better to start learning from?

reicheratwork commented 1 year ago

OK thanks~

each process's reader will need to do its own deserialization

Regarding the serialization/deserialization part, is there any relevant document to learn, or which source code is better to start learning from?

I would say, start at the DDS XTypes spec, as these are the current serialization schemes in use in CycloneDDS. And then read through section 7.4 (Data Representation) which describes the serialization in more detail.

YeahhhhLi commented 1 year ago

I would say, start at the DDS XTypes spec, as these are the current serialization schemes in use in CycloneDDS. And then read through section 7.4 (Data Representation) which describes the serialization in more detail.

Thanks bro!