eclipse-cyclonedds / cyclonedds-cxx

Other
93 stars 74 forks source link

on_subscription_matched / on_publication_matched listeners not reliable / only called once #410

Open Pro opened 1 year ago

Pro commented 1 year ago

I am still facing the issue described in #378 with version 0.10.3 (ping @trittsv)

I.e., whichever process starts second, it does not properly call the corresponding on_subscription_matched or on_publication_matched callbacks in the Listeners.

Note that the same code works perfectly fine with RTI Connext DDS.

I used the same code as in #378 for reproduction steps: https://github.com/eclipse-cyclonedds/cyclonedds-cxx/files/10918029/listener-not-reliable.zip

With a small modification, to re-create the publisher and writer in a loop (see code below) and keep the subscriber running forever.

The interesting part now is, that the on_publication_matched callback is only called once (in the best case). For subsequent creations of the publisher & writer, the on_publication_matched is not called at all.

I.e., if you start the subscriber.cpp, and then the publisher.cpp, you will see the following output:

=== [Publisher] Create writer.
on_publication_matched, current_count: 1
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Create writer.
=== [Publisher] Write sample.
=== [Publisher] Waiting for sample to be accepted.
=== [Publisher] Done.

Process finished with exit code 0

And the writer definitely has a match, otherwise it would not write the data (see wait loop).

So, why is the on_publication_matched only called once, and not within every loop? If I change the code to also re-create the Domain Participant inside the loop, the on_publication_matched callback is always called. So it looks like there is some caching of statuses.

publisher.cpp

/*
 * Copyright(c) 2006 to 2020 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
 */
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <thread>

/* Include the C++ DDS API. */
#include "dds/dds.hpp"

/* Include data type and specific traits to be used with the C++ DDS API. */
#include "HelloWorldData.hpp"

using namespace org::eclipse::cyclonedds;

class ExampleListener : public virtual dds::pub::DataWriterListener<HelloWorldData::Msg>
{
public:
    virtual void on_offered_deadline_missed(dds::pub::DataWriter<HelloWorldData::Msg>& writer,
                                            const dds::core::status::OfferedDeadlineMissedStatus& status)
    {
        std::cout << "on_offered_deadline_missed" << std::endl;
    }

    virtual void on_offered_incompatible_qos(dds::pub::DataWriter<HelloWorldData::Msg>& writer,
                                             const dds::core::status::OfferedIncompatibleQosStatus& status)
    {
        std::cout << "on_offered_incompatible_qos" << std::endl;
    }

    virtual void on_liveliness_lost(dds::pub::DataWriter<HelloWorldData::Msg>& writer,
                                    const dds::core::status::LivelinessLostStatus& status)
    {
        std::cout << "on_liveliness_lost" << std::endl;
    }

    virtual void on_publication_matched(dds::pub::DataWriter<HelloWorldData::Msg>& writer,
                                        const dds::core::status::PublicationMatchedStatus& status)
    {
        std::cout << "on_publication_matched, current_count: " << status.current_count() << std::endl;
    }
};

int main()
{
    try {

        dds::domain::DomainParticipant participant(domain::default_id());

        dds::topic::Topic<HelloWorldData::Msg> topic(participant, "HelloWorldData_Msg");

        for (size_t i = 0; i < 10; i++) {
            // create the writer in a loop, just for testing
            std::cout << "=== [Publisher] Create writer." << std::endl;

            auto publisher = ::std::make_shared<dds::pub::Publisher>(participant);

            auto listener = ::std::make_shared<ExampleListener>();
            auto writer =
                ::std::make_shared<dds::pub::DataWriter<HelloWorldData::Msg>>(*publisher.get(),
                                                                              topic,
                                                                              publisher->default_datawriter_qos(),
                                                                              listener.get(),
                                                                              dds::core::status::StatusMask::all());

            while (writer->publication_matched_status().current_count() == 0) {
                std::this_thread::sleep_for(std::chrono::milliseconds(20));
            }

            HelloWorldData::Msg msg(1, "Hello World");

            std::cout << "=== [Publisher] Write sample." << std::endl;
            writer->write(msg);
            std::cout << "=== [Publisher] Waiting for sample to be accepted." << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));

            writer->close();
            writer.reset();
            publisher->close();
            publisher.reset();
        }
    }
    catch (const dds::core::Exception& e) {
        std::cerr << "=== [Publisher] Exception: " << e.what() << std::endl;
        return EXIT_FAILURE;
    }

    std::cout << "=== [Publisher] Done." << std::endl;

    return EXIT_SUCCESS;
}
trittsv commented 1 year ago

Hey, @Pro i have the same issue with 0.10.3, see here #392 (just closed it because i thought it was my stupidity)

Currently i use 0.10.2 with the fixes patched into it, i did not get i work with 0.10.3 too.

trittsv commented 1 year ago

@eboasson do you have a idea on this?

Pro commented 1 year ago

@trittsv THANK YOU :)

I can confirm that directly applying the fixes onto 0.10.2 works properly, but using release 0.10.3 does not work.

I also tested using CycloneDDS (C basis) with 0.10.2 and 0.10.3. That does not make a difference. But as soon as I use the CXX wrapper in version 0.10.3, it does not work. 0.10.2 with the patches works.

Pro commented 1 year ago

The problematic commit is this one: https://github.com/eclipse-cyclonedds/cyclonedds-cxx/commit/a49b3f0f5ad118edad7628fb5e4e3d7ece8467b5

If I remove it from 0.10.3, there is no issue with on_publication_matched.

It was part of #387 (ping @eboasson @reicheratwork )

eboasson commented 1 year ago

Thank you so much @Pro and @trittsv!

Just wanted to let you know that this is pretty high up on our list of issues. I am happy that at least there is a workaround (reverting a49b3f0f5ad118edad7628fb5e4e3d7ece8467b5, but by itself that can't be a fix because that change went in for another good reason).

e-hndrks commented 1 year ago

Hi @Pro and @trittsv , @eboasson has asked me to look into this issue and I wanted to give you a little update of where we are at right now. We were able to reproduce the issue successfully and quickly found the culprit.

The first publication_matched event is fired by the ddsi thread that is responsible for the initial discovery between Reader and Writer, and it occurs some time after the C++ Writer creation finished successfully (We can actually see that the sleep has been invoked a number of times prior to the C++ Listener callback.) However, since the Participant stays alive for the duration of the publishing application, it will remember the presence of the remote Reader and notify the 2nd Writer about it directly at creation time on account of the thread that is actually invoking the dds_create_writer call. Because the C++ API is built on top of the C API, and the listener object is passed down to C at creation time, the C Writer will try to invoke its C++ listener during the the invocation dds_create_writer, and so before C++ had any chance of wrapping a C++ object around this Writer.

The commit that broke your example is needed to prevent a C++ writer from dropping its last reference during a listener callback. For that reason it creates an additional reference to the Writer, which is dropped after the listener callback ended successfully. However, in this particular case the call to create an additional reference to the Writer fails due to the fact that C++ hasn't been able to create its C++ wrapper around that Writer in the first place, causing the callback to be skipped.

What we will do to fix this is to create the C++ writer in a two-step process:

  1. We first create the underlying C Writer without a listener, so that we can create our C++ wrapper around it prior to receiving our first listener callback.
  2. Once C++ is set up correctly, we set the C++ listener separately using a call to dds_set_listener. This call should then look for any pending (unhandled) events and invoke the registered callbacks on account of the thread that is invoking dds_set_listener.

This approach would require us to change the behavior of dds_set_listener somewhat, since it currently doesn't check for any pending events and doesn't invoke any listener calls. But our expectation is that this approach would fix your problem.