eclipse-cyclonedds / cyclonedds-cxx

Other
94 stars 75 forks source link

Timeout when use reliable qos. #491

Open ruoruoniao opened 5 months ago

ruoruoniao commented 5 months ago

Question

  1. I think if two-point is all reliable, DataWriter should waiting for DataReader consume (on_data_available function end) before send next message even though without ResourceLimits. Is that right?
  2. If resource limits is reached, DataWriter should wait for enabled then send next message, it shouldn't send so many messages before dataReader enable.

Operate Step: Publisher and subscriber run in the same machine.

//publisher.cpp
int main(int argc, char **argv) {
    dds::domain::DomainParticipant participant(13);
    dds::topic::Topic<HelloWorldData::Msg> topic(participant, "topic");

    dds::pub::Publisher publisher(participant);

    auto qos = publisher.default_datawriter_qos();
    qos->policy(dds::core::policy::Reliability::Reliable(dds::core::Duration::from_secs(20)));
    qos->policy(dds::core::policy::History::KeepAll());
    qos->policy(dds::core::policy::Durability::TransientLocal());
    auto limits = dds::core::policy::ResourceLimits();
    limits.max_samples(1);
    qos->policy(limits);
    dds::pub::DataWriter<HelloWorldData::Msg> writer(publisher, topic, qos);

    std::cout << "=== [Publisher] Waiting for subscriber." << std::endl;
    while (writer.publication_matched_status().current_count() == 0) {
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(5000));

    int i = 0;
    HelloWorldData::Msg msg(0, "Hello World");
    while(true) {
        msg.userID(i++);
        writer.write(msg);
        std::cout << "=== [Publisher] Write " << i << std::endl;
    }
}
//subscribe.cpp
int main(int argc, char **argv) {
    dds::domain::DomainParticipant participant(13);
    dds::topic::Topic<HelloWorldData::Msg> topic(participant, "topic");

    dds::sub::Subscriber subscriber(participant);

    auto qos = subscriber.default_datareader_qos();
    qos->policy(dds::core::policy::Reliability::Reliable());
    qos->policy(dds::core::policy::History::KeepAll());
    qos->policy(dds::core::policy::Durability::TransientLocal());
    auto limits = dds::core::policy::ResourceLimits();
    limits.max_samples(1);
    qos->policy(limits);

    class Listener : public dds::sub::NoOpDataReaderListener<HelloWorldData::Msg> {
    public:
        void on_data_available(dds::sub::DataReader<HelloWorldData::Msg> &reader) override {
            const auto samples = reader.take;
            for (auto iter = samples.begin; iter < samples.end; ++iter) {
                if (!iter->info().valid()) { continue; }
                std::cout << "read " << iter->data().userID() << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }

    dds::sub::DataReader reader(subscriber, topic, qos, new Listener(), dds::core::status::StatusMask::all());
    while(true) { std::this_thread::sleep_for(std::chrono::seconds(1)); }
}

Expect:

Now

Question

  1. I think if two-point is all reliable, DataWriter should waiting for DataReader consume (on_data_available function end) before send next message even though without ResourceLimits. Is that right?
  2. If resource limits is reached, DataWriter should wait for enabled then send next message, it shouldn't send so many messages before dataReader enable.
ruoruoniao commented 5 months ago

Or this behavior is normal? I should confirm whether I should find why...

ruoruoniao commented 3 months ago

OK, I found there is a c api in cyclone called dds_wait_for_acks. So if there is a plan to support wait_for_acknowledgments in org::eclipse::cyclonedds::pub::AnyDataWriterDelegate? Now it is:

void
AnyDataWriterDelegate::wait_for_acknowledgments(
    const dds::core::Duration& timeout)
{
    ISOCPP_THROW_EXCEPTION(ISOCPP_UNSUPPORTED_ERROR, "Function not currently supported");
    (void)timeout;
}
eboasson commented 3 months ago

So if there is a plan to support wait_for_acknowledgments in org::eclipse::cyclonedds::pub::AnyDataWriterDelegate

Yes, it just flew under the radar (the C++ binding doesn't get as much love as the C bit, I am afraid). I imagine something as simple as:

    int ddsc_ret = dds_wait_for_acks (ddsc_entity, org::eclipse::cyclonedds::core::convertDuration(timeout));
    ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ddsc_ret, "dds_wait_for_acks failed.");

should do the trick.