eProsima / Fast-DDS

The most complete DDS - Proven: Plenty of success cases. Looking for commercial support? Contact info@eprosima.com
https://eprosima.com
Apache License 2.0
2.12k stars 757 forks source link

Question about ddsexampletcp while exchange the client and server [13703] #2288

Closed lurenlym closed 2 years ago

lurenlym commented 2 years ago

Environment

version: master branch cloned on 10/20 2021 895197a9 os: ubuntu 18.04 arch: x86

Question

It is normal to use DDSHelloWorldExampleTCP demo to transmit 900k of data per 100ms. But when I exchanged the server and client in the code something is wrong. The transmission speed is significantly slower than before.

The main modified code of Publisher init:

    DomainParticipantQos pqos;
    std::shared_ptr<TCPv4TransportDescriptor> descriptor =std::make_shared<TCPv4TransportDescriptor>();
    pqos.wire_protocol().builtin.discovery_config.leaseDuration =eprosima::fastrtps::c_TimeInfinite;
    pqos.wire_protocol().builtin.discovery_config
    leaseDuration_announcementperiod = eprosima::fastrtps::Duration_t(5, 0);
    int32_t kind = LOCATOR_KIND_TCPv4;
    Locator initial_peer_locator;
    initial_peer_locator.kind = kind;
    for (std::string ip : whitelist)
    {
        descriptor->interfaceWhiteList.push_back(ip);
        std::cout << "Whitelisted " << ip << std::endl;
    }
    if (!wan_ip.empty())
    {
        IPLocator::setIPv4(initial_peer_locator, wan_ip);
        std::cout << wan_ip << ":" << port << std::endl;
    }
    else
    {
        IPLocator::setIPv4(initial_peer_locator, "127.0.0.1");
    }
    initial_peer_locator.port = port;
    pqos.wire_protocol().builtin.initialPeersList.push_ba
    (initial_peer_locator); // Publisher's meta channel
    pqos.name("client");
    pqos.transport().use_builtin_transports = false;
    pqos.transport().user_transports.push_back(descriptor);
    participant_ = DomainParticipantFactory::get_instanc
    (->create_participant(0, pqos);

    if (participant_ == nullptr)
    {
        return false;
    }    
    //REGISTER THE TYPE
    type_.register_type(participant_);
    //CREATE THE PUBLISHER
    publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT);

    if (publisher_ == nullptr)
    {
        return false;
    }

    //CREATE THE TOPIC
    topic_ = participant_->create_topic("HelloWorldTopicTCP", "HelloWorld", TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    //CREATE THE DATAWRITER
    DataWriterQos wqos;
    wqos.history().kind = KEEP_LAST_HISTORY_QOS;
    wqos.history().depth = 1;
    wqos.resource_limits().max_samples = 50;
    wqos.resource_limits().allocated_samples = 20;
    wqos.reliable_writer_qos().times.heartbeatPeriod.seconds = 2;
    wqos.reliable_writer_qos().times.heartbeatPeriod.nanosec = 200 * 1000 * 1000;
    wqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
    writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);

    if (writer_ == nullptr)
    {
        return false;
    }
    return true;

The main modified code of Subscriber init:

    //CREATE THE PARTICIPANT
    DomainParticipantQos pqos;
    std::shared_ptr<TCPv4TransportDescriptor> descriptor =
    std::make_shared<TCPv4TransportDescriptor>();
    pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
    pqos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = eprosima::fastrtps::Duration_t(5, 0);
    int32_t kind = LOCATOR_KIND_TCPv4;
    Locator initial_peer_locator;
    initial_peer_locator.kind = kind;
    for (std::string ip : whitelist)
    {
        descriptor->interfaceWhiteList.push_back(ip);
        std::cout << "Whitelisted " << ip << std::endl;
    }
    if (!wan_ip.empty())
    {
        IPLocator::setIPv4(initial_peer_locator, wan_ip);
        std::cout << wan_ip << ":" << port << std::endl;
    }
    else
    {
        IPLocator::setIPv4(initial_peer_locator, "127.0.0.1");
    }
    initial_peer_locator.port = port;
    pqos.wire_protocol().builtin.initialPeersList.push_ba
    (initial_peer_locator); // Publisher's meta channel
    pqos.name("client");

    if (use_tls)
    {
        using TLSVerifyMode
        =TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
        using TLSOptions =TCPTransportDescriptor::TLSConfig::TLSOptions;
        descriptor->apply_security = true;
        descriptor->tls_config.verify_file = "cacert.pem";
        descriptor->tls_config.verify_mode =TLSVerifyMode::VERIFY_PEER;
        descriptor->tls_config.add_optio(TLSOptions::DEFAULT_WORKAROUNDS);
    }
    pqos.transport().use_builtin_transports = false;
    pqos.transport().user_transports.push_back(descriptor);
    participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);

    if (participant_ == nullptr)
    {
        return false;
    }    
    //REGISTER THE TYPE
    type_.register_type(participant_);

    // //CREATE THE SUBSCRIBER
    subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT);

    if (subscriber_ == nullptr)
    {
        return false;
    }

    //CREATE THE TOPIC
    topic_ = participant_->create_topic("HelloWorldTopicTCP", "HelloWorld", TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    //CREATE THE DATAREADER
    DataReaderQos rqos;
    rqos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
    rqos.history().depth = 1;
    rqos.resource_limits().max_samples = 50;
    rqos.resource_limits().allocated_samples = 20;
    rqos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS;
    rqos.durability().kind = eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS;
    reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);

    if (reader_ == nullptr)
    {
        return false;
    }

    return true;

publisher code

while (!stop_)
{
    if (publish(false))
    {
        //logError(HW, "SENT " <<  hello_.index());
        std::cout << "[RTCP] Message: " << hello_.message().size() << " with index: "
                  <<std::fixed<< std::setprecision(8)<<hello_.index() << " SENT" << std::endl;
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
}

test designed and CLI

computer A ip 10.10.10.1 publisher computer B ip 10.3.1.102 subscriber In A:

./DDSHelloWorldExampleTCP publisher -a 10.10.10.1 -p 7775

In B:

./DDSHelloWorldExampleTCP subscriber -a 10.3.1.102 -p 7775

Observe bandwidth usage

Publisher as server and Subscriber as client:(original code)

e3833acd7e4307430fbf80a3b4cacf2 Publisher as client and Subscriber as server:(modified code)

6311141529ba69065a0933cbbcb6c2b

It seems the publisher as the client will transmit three to four times more data, then try to debug source code to find the most time-consuming part of the program. I found following code will bring different time consumption differences

src/cpp/rtps/participant/RTPSParticipantImpl.h

auto track_begin = std::chrono::system_clock::now();
if (lock.try_lock_until(max_blocking_time_point))
{
    ret_code = true;

    for (auto& send_resource : send_resource_list_)
    {
        LocatorIteratorT locators_begin destination_locators_begin;
        LocatorIteratorT locators_end = destination_locators_end;
        send_resource->send(msg->buffer, msg->length,locators_begin, &locators_end,
                max_blocking_time_point);

    }
    lock.unlock();

    // notify statistics module
    on_rtps_send(
        sender_guid,
        destination_locators_begin,
        destination_locators_end,
        msg->length)
    // checkout if sender is a discovery endpoint
    on_discovery_packet(
        sender_guid,
        destination_locators_begin,
        destination_locators_end);
}
auto track_end = std::chrono::system_clock::now();
double time std::chrono::duration_cast<std::chrono::microseconds>(track_end track_begin).count();
std::cout <<"data size: "<< msg->length<<"\t"<<"send data: "<time << "us" << std::endl;

900k data will be divided into 16 segments(due to 64k limits) The results shows the code above will cost about 200us~1000us while publisher as server after client matched. But it seems that code flush twice in one frame.

[RTCP] Message: 900000 with index: 1634903909.41644001 SENT
data size: 65500        send data: 294.00000000us
data size: 65500        send data: 284.00000000us
data size: 65500        send data: 271.00000000us
data size: 65500        send data: 270.00000000us
data size: 65500        send data: 270.00000000us
data size: 65500        send data: 456.00000000us
data size: 65500        send data: 1112.00000000us
data size: 65500        send data: 4852.00000000us
data size: 65500        send data: 276.00000000us
data size: 50140        send data: 613.00000000us
data size: 65500        send data: 568.00000000us
data size: 65500        send data: 589.00000000us
data size: 65500        send data: 524.00000000us
data size: 65500        send data: 478.00000000us
data size: 65500        send data: 534.00000000us
data size: 65500        send data: 517.00000000us
data size: 65500        send data: 493.00000000us
data size: 65500        send data: 533.00000000us
data size: 65500        send data: 498.00000000us
data size: 65500        send data: 2943.00000000us
data size: 65500        send data: 281.00000000us
data size: 65500        send data: 328.00000000us
data size: 65500        send data: 5936.00000000us
data size: 50140        send data: 279.00000000us
[RTCP] Message: 900000 with index: 1634903909.53169107 SENT

I observed the call stack and found the "pulish(false)" with call function https://github.com/eProsima/Fast-DDS/blob/4a27fbbbeb08ad83ffa10422eeaa201017382905/src/cpp/rtps/writer/StatefulWriter.cpp#L2017 normally in and
https://github.com/eProsima/Fast-DDS/blob/7e617577da511d89ccdbb42293c795254d07b964/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp#L1146 the variable send_resourcelist size is 3.

When I exchange the server and client:

[RTCP] Message: 900000 with index: 1634905204.91387200 SENT
data size: 65500        send data: 106164.00000000us
data size: 65500        send data: 108359.00000000us
data size: 65500        send data: 106838.00000000us
data size: 65500        send data: 108011.00000000us
data size: 65500        send data: 111677.00000000us
data size: 65500        send data: 116774.00000000us
data size: 65500        send data: 107150.00000000us
data size: 348  send data: 82146.00000000us
data size: 348  send data: 170.00000000us
data size: 65500        send data: 109527.00000000us
data size: 65500        send data: 108574.00000000us
data size: 65500        send data: 107530.00000000us
data size: 65500        send data: 108564.00000000us
data size: 65500        send data: 108043.00000000us
data size: 65500        send data: 105379.00000000us
data size: 50140        send data: 81191.00000000us
[RTCP] Message: 900000 with index: 1634905206.51240897 SENT

We can see that transmitting each seg data will take hundreds of times the time include the heartbeat frame. One of the biggest factors is the "send_resourcelist" size is 36. This seems to be caused by repeatedly converting the same address to this variable when establishing a TCP connection. I found that while server and client are not matched, the "send_resourcelist" is 1 normally. But when the server is open, "send_resourcelist" will be added abnormally.

Q1: Is it a normal phenomenon in test1(publisher as a server)? It looks like I should change the relevant parameters of flowcontrol

Q2: How to accelerate while server as a client? It seems that the createSenderResources function is called multiple times, is it a bug? In a normal peer-to-peer connection case, How much should this variable be, and what factors are related?

juanlofer-eprosima commented 2 years ago

Thanks @lurenlym for such an in-depth report!

Reviewing your modifications I have noticed some lines of code were not transferred between publisher and subscriber in the exchange of TCP roles. Would you please test the following configuration and see if it makes any difference? That would be very valuable for us before further looking into this issue.

The main modified code of Publisher init:

    DomainParticipantQos pqos;
    std::shared_ptr<TCPv4TransportDescriptor> descriptor =std::make_shared<TCPv4TransportDescriptor>();
    pqos.wire_protocol().builtin.discovery_config.leaseDuration =eprosima::fastrtps::c_TimeInfinite;
    pqos.wire_protocol().builtin.discovery_config
    leaseDuration_announcementperiod = eprosima::fastrtps::Duration_t(5, 0);
    int32_t kind = LOCATOR_KIND_TCPv4;
    Locator initial_peer_locator;
    initial_peer_locator.kind = kind;
    for (std::string ip : whitelist)
    {
        descriptor->interfaceWhiteList.push_back(ip);
        std::cout << "Whitelisted " << ip << std::endl;
    }
    if (!wan_ip.empty())
    {
        IPLocator::setIPv4(initial_peer_locator, wan_ip);
        std::cout << wan_ip << ":" << port << std::endl;
    }
    else
    {
        IPLocator::setIPv4(initial_peer_locator, "127.0.0.1");
    }
    initial_peer_locator.port = port;
    pqos.wire_protocol().builtin.initialPeersList.push_ba
    (initial_peer_locator); // Subscriber's meta channel
    pqos.name("client");
    pqos.transport().use_builtin_transports = false;
    pqos.transport().user_transports.push_back(descriptor);
    participant_ = DomainParticipantFactory::get_instanc
    (->create_participant(0, pqos);

    if (participant_ == nullptr)
    {
        return false;
    }    
    //REGISTER THE TYPE
    type_.register_type(participant_);
    //CREATE THE PUBLISHER
    publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT);

    if (publisher_ == nullptr)
    {
        return false;
    }

    //CREATE THE TOPIC
    topic_ = participant_->create_topic("HelloWorldTopicTCP", "HelloWorld", TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    //CREATE THE DATAWRITER
    DataWriterQos wqos;
    wqos.history().kind = KEEP_LAST_HISTORY_QOS;
    wqos.history().depth = 1;
    wqos.resource_limits().max_samples = 50;
    wqos.resource_limits().allocated_samples = 20;
    wqos.reliable_writer_qos().times.heartbeatPeriod.seconds = 2;
    wqos.reliable_writer_qos().times.heartbeatPeriod.nanosec = 200 * 1000 * 1000;
    wqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
    writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);

    if (writer_ == nullptr)
    {
        return false;
    }
    return true;

The main modified code of Subscriber init:

    //CREATE THE PARTICIPANT
    DomainParticipantQos pqos;
    std::shared_ptr<TCPv4TransportDescriptor> descriptor =
    std::make_shared<TCPv4TransportDescriptor>();
    pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
    pqos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = eprosima::fastrtps::Duration_t(5, 0);
    for (std::string ip : whitelist)
    {
        descriptor->interfaceWhiteList.push_back(ip);
        std::cout << "Whitelisted " << ip << std::endl;
    }
    if (!wan_ip.empty())
    {
        descriptor->set_WAN_address(wan_ip);
        std::cout << wan_ip << ":" << port << std::endl;
    }
    descriptor->add_listener_port(port);
    pqos.name("server");

    if (use_tls)
    {
        using TLSVerifyMode
        =TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
        using TLSOptions =TCPTransportDescriptor::TLSConfig::TLSOptions;
        descriptor->apply_security = true;
        descriptor->tls_config.verify_file = "cacert.pem";
        descriptor->tls_config.verify_mode =TLSVerifyMode::VERIFY_PEER;
        descriptor->tls_config.add_optio(TLSOptions::DEFAULT_WORKAROUNDS);
    }
    pqos.transport().use_builtin_transports = false;
    pqos.transport().user_transports.push_back(descriptor);
    participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);

    if (participant_ == nullptr)
    {
        return false;
    }    
    //REGISTER THE TYPE
    type_.register_type(participant_);

    // //CREATE THE SUBSCRIBER
    subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT);

    if (subscriber_ == nullptr)
    {
        return false;
    }

    //CREATE THE TOPIC
    topic_ = participant_->create_topic("HelloWorldTopicTCP", "HelloWorld", TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    //CREATE THE DATAREADER
    DataReaderQos rqos;
    rqos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
    rqos.history().depth = 1;
    rqos.resource_limits().max_samples = 50;
    rqos.resource_limits().allocated_samples = 20;
    rqos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS;
    rqos.durability().kind = eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS;
    reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);

    if (reader_ == nullptr)
    {
        return false;
    }

    return true;
lurenlym commented 2 years ago

https://github.com/eProsima/Fast-DDS/issues/2288#issuecomment-954491317 @juanlofer-eprosima sorry, I found to copy the wrong subscriber init code in the issues. Sorry for the confusion. Actually, the result of my test is obtained after the two roles have been swapped correctly. In order to facilitate debugging, I added a variable to control whether the program is a client or a server. The complete code is as follows: Publisher.cpp

/**
 * @file HelloWorldPublisher.cpp
 *
 */

#include "HelloWorldPublisher.h"
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/IPLocator.h>
#include "/usr/include/x86_64-linux-gnu/sys/time.h"
#include <thread>
#include <string>
#include <iomanip>
using namespace eprosima::fastdds::dds;
using namespace eprosima::fastdds::rtps;
using namespace eprosima::fastrtps::rtps;
HelloWorldPublisher::HelloWorldPublisher()
    : participant_(nullptr)
    , publisher_(nullptr)
    , topic_(nullptr)
    , writer_(nullptr)
    , type_(new HelloWorldPubSubType())
{
}

bool HelloWorldPublisher::init(
        const std::string& wan_ip,
        unsigned short port,
        bool use_tls,
        bool server,
        const std::vector<std::string>& whitelist)
{
    stop_ = false;
    hello_.index(0);
    std::string s(900000,'a');
    std::vector<char> d(s.begin(), s.end());
    hello_.message(d);
    //CREATE THE PARTICIPANT
    DomainParticipantQos pqos;
    std::shared_ptr<TCPv4TransportDescriptor> descriptor = std::make_shared<TCPv4TransportDescriptor>();
    pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
    pqos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = eprosima::fastrtps::Duration_t(5, 0);
    if(server)
    {
        pqos.name("server");
        for (std::string ip : whitelist)
        {
            descriptor->interfaceWhiteList.push_back(ip);
            std::cout << "Whitelisted " << ip << std::endl;
        }
        if (use_tls)
        {
            using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
            descriptor->apply_security = true;
            descriptor->tls_config.password = "test";
            descriptor->tls_config.cert_chain_file = "servercert.pem";
            descriptor->tls_config.private_key_file = "serverkey.pem";
            descriptor->tls_config.tmp_dh_file = "dh2048.pem";
            descriptor->tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
            descriptor->tls_config.add_option(TLSOptions::SINGLE_DH_USE);
            descriptor->tls_config.add_option(TLSOptions::NO_SSLV2);
        }

        descriptor->sendBufferSize = 2000000;
        descriptor->receiveBufferSize = 2000000;

        if (!wan_ip.empty())
        {
            descriptor->set_WAN_address(wan_ip);
            std::cout << wan_ip << ":" << port << std::endl;
        }
        descriptor->add_listener_port(port);

    }
    else{
        int32_t kind = LOCATOR_KIND_TCPv4;

        Locator initial_peer_locator;
        initial_peer_locator.kind = kind;

        for (std::string ip : whitelist)
        {
            descriptor->interfaceWhiteList.push_back(ip);
            std::cout << "Whitelisted " << ip << std::endl;
        }

        if (!wan_ip.empty())
        {
            IPLocator::setIPv4(initial_peer_locator, wan_ip);
            std::cout << wan_ip << ":" << port << std::endl;
        }
        else
        {
            IPLocator::setIPv4(initial_peer_locator, "127.0.0.1");
        }
        initial_peer_locator.port = port;
        pqos.wire_protocol().builtin.initialPeersList.push_back(initial_peer_locator); // Publisher's meta channel

        pqos.name("client");

        if (use_tls)
        {
            using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
            using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
            descriptor->apply_security = true;
            descriptor->tls_config.verify_file = "cacert.pem";
            descriptor->tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
            descriptor->tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
        }
    }
    pqos.transport().use_builtin_transports = false;
    pqos.transport().user_transports.push_back(descriptor);
    participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);

    if (participant_ == nullptr)
    {
        return false;
    }    
    //REGISTER THE TYPE
    type_.register_type(participant_);
    //CREATE THE PUBLISHER
    publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT);

    if (publisher_ == nullptr)
    {
        return false;
    }

    //CREATE THE TOPIC
    topic_ = participant_->create_topic("HelloWorldTopicTCP", "HelloWorld", TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    //CREATE THE DATAWRITER
    DataWriterQos wqos;
    wqos.history().kind = KEEP_LAST_HISTORY_QOS;
    wqos.history().depth = 1;
    wqos.resource_limits().max_samples = 50;
    wqos.resource_limits().allocated_samples = 20;
    wqos.reliable_writer_qos().times.heartbeatPeriod.seconds = 2;
    wqos.reliable_writer_qos().times.heartbeatPeriod.nanosec = 200 * 1000 * 1000;
    wqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
    writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);

    if (writer_ == nullptr)
    {
        return false;
    }

    return true;
}

HelloWorldPublisher::~HelloWorldPublisher()
{
    if (writer_ != nullptr)
    {
        publisher_->delete_datawriter(writer_);
    }
    if (publisher_ != nullptr)
    {
        participant_->delete_publisher(publisher_);
    }
    if (topic_ != nullptr)
    {
        participant_->delete_topic(topic_);
    }
    DomainParticipantFactory::get_instance()->delete_participant(participant_);
}

void HelloWorldPublisher::PubListener::on_publication_matched(
        eprosima::fastdds::dds::DataWriter*,
        const eprosima::fastdds::dds::PublicationMatchedStatus& info)
{
    if (info.current_count_change == 1)
    {
        matched_ = info.total_count;
        first_connected_ = true;
        std::cout << "[RTCP] Publisher matched" << std::endl;
    }
    else if (info.current_count_change == -1)
    {
        matched_ = info.total_count;
        std::cout << "[RTCP] Publisher unmatched" << std::endl;
    }
    else
    {
        std::cout << info.current_count_change
                  << " is not a valid value for PublicationMatchedStatus current count change" << std::endl;
    }
}

void HelloWorldPublisher::runThread(
        uint32_t samples,
        long sleep_ms)
{
    if (samples == 0)
    {
        while (!stop_)
        {
            if (publish(false))
            {
                //logError(HW, "SENT " <<  hello_.index());
                std::cout << "[RTCP] Message: " << hello_.message().size() << " with index: "
                          <<std::fixed<< std::setprecision(8)<<hello_.index() << " SENT" << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
        }
    }
    else
    {
        for (uint32_t i = 0; i < samples; ++i)
        {
            if (!publish())
            {
                --i;
            }
            else
            {
                std::cout << "[RTCP] Message: " << hello_.message().size() << " with index: "
                          <<std::fixed<< std::setprecision(8)<<hello_.index() << " SENT" << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
        }
    }
}

void HelloWorldPublisher::run(
        uint32_t samples,
        long sleep_ms)
{
    std::thread thread(&HelloWorldPublisher::runThread, this, samples, sleep_ms);
    if (samples == 0)
    {
        std::cout << "Publisher running. Please press enter to stop_ the Publisher at any time." << std::endl;
        std::cin.ignore();
        stop_ = true;
    }
    else
    {
        std::cout << "Publisher running " << samples << " samples." << std::endl;
    }
    thread.join();
}

bool HelloWorldPublisher::publish(
        bool waitForListener)
{
    if (listener_.first_connected_ || !waitForListener || listener_.matched_ > 0)
    {

        struct timeval tv;    
        gettimeofday(&tv, NULL);    //该函数在sys/time.h头文件中
        double timestamp = tv.tv_sec+double(tv.tv_usec)/1000000.0;
        // hello_.message(d);
        hello_.index(timestamp);
        writer_->write((void*)&hello_);
        return true;
    }
    return false;
}

Subscirber.cpp

#include "HelloWorldSubscriber.h"
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/IPLocator.h>

using namespace eprosima::fastdds::dds;
using namespace eprosima::fastdds::rtps;

using IPLocator = eprosima::fastrtps::rtps::IPLocator;

HelloWorldSubscriber::HelloWorldSubscriber()
    : participant_(nullptr)
    , subscriber_(nullptr)
    , topic_(nullptr)
    , reader_(nullptr)
    , type_(new HelloWorldPubSubType())
{
}

bool HelloWorldSubscriber::init(
        const std::string& wan_ip,
        unsigned short port,
        bool use_tls,
        bool server,
        const std::vector<std::string>& whitelist)
{

    //CREATE THE PARTICIPANT
    DomainParticipantQos pqos;
    std::shared_ptr<TCPv4TransportDescriptor> descriptor = std::make_shared<TCPv4TransportDescriptor>();
    pqos.wire_protocol().builtin.discovery_config.leaseDuration = eprosima::fastrtps::c_TimeInfinite;
    pqos.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod = eprosima::fastrtps::Duration_t(5, 0);
    if(server)
    {
        pqos.name("server");
        for (std::string ip : whitelist)
        {
            descriptor->interfaceWhiteList.push_back(ip);
            std::cout << "Whitelisted " << ip << std::endl;
        }
        if (use_tls)
        {
            using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
            descriptor->apply_security = true;
            descriptor->tls_config.password = "test";
            descriptor->tls_config.cert_chain_file = "servercert.pem";
            descriptor->tls_config.private_key_file = "serverkey.pem";
            descriptor->tls_config.tmp_dh_file = "dh2048.pem";
            descriptor->tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
            descriptor->tls_config.add_option(TLSOptions::SINGLE_DH_USE);
            descriptor->tls_config.add_option(TLSOptions::NO_SSLV2);
        }

        descriptor->sendBufferSize = 2000000;
        descriptor->receiveBufferSize = 2000000;

        if (!wan_ip.empty())
        {
            descriptor->set_WAN_address(wan_ip);
            std::cout << wan_ip << ":" << port << std::endl;
        }
        descriptor->add_listener_port(port);

    }
    else{
        int32_t kind = LOCATOR_KIND_TCPv4;

        Locator initial_peer_locator;
        initial_peer_locator.kind = kind;

        for (std::string ip : whitelist)
        {
            descriptor->interfaceWhiteList.push_back(ip);
            std::cout << "Whitelisted " << ip << std::endl;
        }

        if (!wan_ip.empty())
        {
            IPLocator::setIPv4(initial_peer_locator, wan_ip);
            std::cout << wan_ip << ":" << port << std::endl;
        }
        else
        {
            IPLocator::setIPv4(initial_peer_locator, "127.0.0.1");
        }
        initial_peer_locator.port = port;
        pqos.wire_protocol().builtin.initialPeersList.push_back(initial_peer_locator); // Publisher's meta channel

        pqos.name("client");

        if (use_tls)
        {
            using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
            using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
            descriptor->apply_security = true;
            descriptor->tls_config.verify_file = "cacert.pem";
            descriptor->tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
            descriptor->tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
        }
    }
    pqos.transport().use_builtin_transports = false;
    pqos.transport().user_transports.push_back(descriptor);
    participant_ = DomainParticipantFactory::get_instance()->create_participant(0, pqos);

    if (participant_ == nullptr)
    {
        return false;
    }    
    //REGISTER THE TYPE
    type_.register_type(participant_);

    // //CREATE THE SUBSCRIBER
    subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT);

    if (subscriber_ == nullptr)
    {
        return false;
    }

    //CREATE THE TOPIC
    topic_ = participant_->create_topic("HelloWorldTopicTCP", "HelloWorld", TOPIC_QOS_DEFAULT);

    if (topic_ == nullptr)
    {
        return false;
    }

    //CREATE THE DATAREADER
    DataReaderQos rqos;
    rqos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS;
    rqos.history().depth = 1;
    rqos.resource_limits().max_samples = 50;
    rqos.resource_limits().allocated_samples = 20;
    rqos.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS;
    rqos.durability().kind = eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS;
    reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);

    if (reader_ == nullptr)
    {
        return false;
    }

    return true;
}

HelloWorldSubscriber::~HelloWorldSubscriber()
{
    if (reader_ != nullptr)
    {
        subscriber_->delete_datareader(reader_);
    }
    if (topic_ != nullptr)
    {
        participant_->delete_topic(topic_);
    }
    if (subscriber_ != nullptr)
    {
        participant_->delete_subscriber(subscriber_);
    }
    DomainParticipantFactory::get_instance()->delete_participant(participant_);
}

void HelloWorldSubscriber::SubListener::on_subscription_matched(
        DataReader*,
        const SubscriptionMatchedStatus& info)
{
    if (info.current_count_change == 1)
    {
        matched_ = info.total_count;
        std::cout << "[RTCP] Subscriber matched" << std::endl;
    }
    else if (info.current_count_change == -1)
    {
        matched_ = info.total_count;
        std::cout << "[RTCP] Subscriber unmatched" << std::endl;
    }
    else
    {
        std::cout << info.current_count_change
                  << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
    }
}

void HelloWorldSubscriber::SubListener::on_data_available(
        DataReader* reader)
{
    SampleInfo info;
    if (reader->take_next_sample(&hello_, &info) == ReturnCode_t::RETCODE_OK)
    {
        if (info.valid_data)
        {
            samples_++;
            // Print your structure data here.
             std::cout << "[RTCP] Message: " << hello_.message().size() << " with index: "
                          <<std::fixed<< std::setprecision(8)<<hello_.index() << " RECEIVED" << std::endl;
        }
    }
}

void HelloWorldSubscriber::run()
{
    std::cout << "[RTCP] Subscriber running. Please press enter to stop the Subscriber" << std::endl;
    std::cin.ignore();
}

void HelloWorldSubscriber::run(
        uint32_t number)
{
    std::cout << "[RTCP] Subscriber running until " << number << "samples have been received" << std::endl;
    while (number < this->listener_.samples_)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

Supplementary materials: HelloWorld.idl

struct HelloWorld
{
    double index;
    sequence<char,1000000> message;
};

Please help to check whether there are basic configuration settings errors. If there are no basic configuration settings errors, the problem should be real. Under the same network settings, I roughly tested the UDP method, and this did not happen.

juanlofer-eprosima commented 2 years ago

Thanks for the clarification @lurenlym, your configuration looks fine to me now.

We are currently working on fixing a couple of TCP issues that might be related to this one. Again, thank you for informing us about this behavior, we will let you know about any updates on this matter as soon as there are any.

JLBuenoLopez commented 2 years ago

@lurenlym,

Would you mind checking against the latest Fast DDS release? #2470 fixed several TCP issues and it might have solved also yours. These changes were also backported to all supported Fast DDS releases.

JLBuenoLopez commented 2 years ago

I am going to close this issue due to inactivity. I possible fix was merged a couple of months ago and no feedback has been received from the user. @lurenlym, please, feel free to reopen if the issue is still happening after applying changes in #2470.