eclipse-cyclonedds / cyclonedds-cxx

Other
93 stars 74 forks source link

Reader listener hangs in GUI application when samples are present #456

Open upstreammuse opened 10 months ago

upstreammuse commented 10 months ago

Greetings, Cyclone team. I've hit an issue trying to use CycloneDDS within a GUI application.

I have a basic "hello world" reader and writer working between two processes, and I've been trying to add a basic GUI to the reader as a proof of concept. In the eventual application, I want late-joining clients to be able to pick up all the existing instances and pass the live ones to a GUI. I think that means that I have to create a Listener, connect it up to my GUI objects, and then register the Listener with a DataReader, so that the GUI doesn't miss anything from that first callback. I do realize that I need to handle threading boundaries and memory ownership between the callback and the GUI event loop. But I haven't gotten that far yet...

As soon as I add even a basic GUI object AND have samples present (using Durable and TransientLocal QoS, with the writer process running), the call to register the Listener hangs. If there are no samples, the application starts up fine. Likewise, if I remove the GUI object and have samples present, the application also starts up fine.

At first I assumed it was something funny with my environment, but I've now tried the basic concept with multiple OSes and GUI toolkits, and I keep getting the same results. I've tried Qt6 on MacOS using both Homebrew and Qt's binary installer. I've tried Qt6 and Qt5 on Ubuntu using the system packages. I started thinking it was a Qt issue, but then I was able to also reproduce the problem with wxWidgets on Linux.

In all these cases, the reader process gets hung on a mutex lock, and the call to listener(...) never returns. The callback does get called, however.

#0  futex_wait (private=0, expected=2, futex_word=0x55555581ada0) at ../sysdeps/nptl/futex-internal.h:146
#1  __GI___lll_lock_wait (futex=futex@entry=0x55555581ada0, private=0) at ./nptl/lowlevellock.c:49
#2  0x00007ffff6898002 in lll_mutex_lock_optimized (mutex=0x55555581ada0) at ./nptl/pthread_mutex_lock.c:48
#3  ___pthread_mutex_lock (mutex=0x55555581ada0) at ./nptl/pthread_mutex_lock.c:93
#4  0x00007ffff7f4a62d in ddsrt_mutex_lock (mutex=<optimized out>)
    at /home/xxxxx/Programming/cyclonedds/src/ddsrt/src/sync/posix/sync.c:40
#5  0x00007ffff71559a4 in org::eclipse::cyclonedds::core::Mutex::lock() const () from /usr/local/lib/libddscxx.so.0
#6  0x00007ffff7155b8f in org::eclipse::cyclonedds::core::ObjectDelegate::lock() const () from /usr/local/lib/libddscxx.so.0
#7  0x000055555558bc66 in org::eclipse::cyclonedds::core::ScopedLock<org::eclipse::cyclonedds::core::ObjectDelegate>::ScopedLock(org::eclipse::cyclonedds::core::ObjectDelegate const&, bool) ()
#8  0x00007ffff719ce81 in org::eclipse::cyclonedds::sub::AnyDataReaderDelegate::loaned_take(int, dds::sub::status::DataState const&, dds::sub::detail::SamplesHolder&, unsigned int) () from /usr/local/lib/libddscxx.so.0
#9  0x0000555555588b10 in dds::sub::detail::DataReader<Hello>::take() ()
#10 0x0000555555586df9 in dds::sub::DataReader<Hello, dds::sub::detail::DataReader>::take() ()
#11 0x0000555555585edb in Listener::on_data_available(dds::sub::DataReader<Hello, dds::sub::detail::DataReader>&) ()
#12 0x00005555555983de in dds::sub::detail::DataReader<Hello>::on_data_available(int) ()
#13 0x00007ffff715fd33 in callback_on_data_available () from /usr/local/lib/libddscxx.so.0
#14 0x00007ffff7f1cbc9 in da_or_dor_cb_invoke (rd=0x5555556dc000, lst=0x5555556dc130, status_and_mask=359929088, 
    async=<optimized out>) at /home/xxxxx/Programming/cyclonedds/src/core/ddsc/src/dds_reader.c:215
#15 0x00007ffff7f2ac2e in dds_entity_deriver_invoke_cbs_for_pending_events (status=<optimized out>, e=<optimized out>)
    at /home/xxxxx/Programming/cyclonedds/src/core/ddsc/src/dds__types.h:261
#16 dds_set_listener (entity=<optimized out>, listener=0x5555557da800)
    at /home/xxxxx/Programming/cyclonedds/src/core/ddsc/src/dds_entity.c:1083
#17 0x00007ffff715c56c in org::eclipse::cyclonedds::core::EntityDelegate::listener_set(void*, dds::core::status::StatusMask const&, bool) () from /usr/local/lib/libddscxx.so.0
#18 0x0000555555589f90 in dds::sub::detail::DataReader<Hello>::listener(dds::sub::DataReaderListener<Hello>*, dds::core::status::StatusMask const&) ()
#19 0x0000555555587bde in dds::sub::DataReader<Hello, dds::sub::detail::DataReader>::listener(dds::sub::DataReaderListener<Hello>*, dds::core::status::StatusMask const&) ()
#20 0x0000555555582f93 in App::OnInit() ()
#21 0x0000555555584bbd in wxAppConsoleBase::CallOnInit() ()
#22 0x00007ffff74e2bd2 in wxEntry(int&, wchar_t**) () from /lib/x86_64-linux-gnu/libwx_baseu-3.0.so.0
--Type <RET> for more, q to quit, c to continue without paging--

I have some minimal examples that I can commit, if that's easier than posting entire sample applications here.

The CycloneDDS and CycloneDDS-CXX unit tests are passing on my builds.

TimMooreAu commented 10 months ago

I'm getting what I think is the same issue. In my case, I'm not using a GUI, the issue seems to occur when calling DataReader.listener() when the the reader and writer are within the same process, and the writer has already written a message when the reader is created (using Durable and TransientLocal QoS).

The issue seems to be due to a ScopedObjectLock which is locked in listener() (frame 18 of stack trace below). Before this lock is unlocked dds_reader_invoke_cbs_for_pending_events() is called, calling the listener callback. If the listener callback does a take(), a ScopedObjectLock on the same object tries to lock (frame 6 of stack trace).

To resolve this, perhaps the original lock needs to be unlocked before pending events are processed?

Stack trace:

#0  0x00007ffe8e0b0a44 in ntdll!ZwWaitForAlertByThreadId () from C:\\WINDOWS\\SYSTEM32\tdll.dll"
#1  0x00007ffe8e039205 in ntdll!RtlAcquireSRWLockExclusive () from C:\\WINDOWS\\SYSTEM32\tdll.dll"
#2  0x00007ffe1a3fab5d in ddsrt_mutex_lock (mutex=0x1826d38af00) at C:/Users/intim/Documents/cyclonedds/cyclonedds-master/src/ddsrt/src/sync/windows/sync.c:31"
#3  0x00007ffe0df76760 in org::eclipse::cyclonedds::core::Mutex::lock (this=0x1826d38b780) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/src/org/eclipse/cyclonedds/core/Mutex.cpp:33"
#4  0x00007ffe0df7696d in org::eclipse::cyclonedds::core::ObjectDelegate::lock (this=0x1826d38b778) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/src/org/eclipse/cyclonedds/core/ObjectDelegate.cpp:36"
#5  0x00007ffe0dfb6911 in org::eclipse::cyclonedds::core::ScopedLock<org::eclipse::cyclonedds::core::ObjectDelegate>::ScopedLock (this=0x18ef3fefe0, obj=..., lock=true) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/include/org/eclipse/cyclonedds/core/ScopedLock.hpp:41"
#6  0x00007ffe0df91615 in org::eclipse::cyclonedds::sub::AnyDataReaderDelegate::loaned_take (this=0x1826d38b5b0, reader=2077064500, mask=..., samples=..., requested_max_samples=4294967295) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/src/org/eclipse/cyclonedds/sub/AnyDataReaderDelegate.cpp:239"
#7  0x00007ff75c64abe7 in dds::sub::detail::DataReader<HelloWorldData::Msg>::take (this=0x1826d38b5b0) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp:613"
#8  0x00007ff75c649727 in dds::sub::DataReader<HelloWorldData::Msg, dds::sub::detail::DataReader>::take (this=0x18ef3ff170) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp:349"
#9  0x00007ff75c6473d0 in TakeListener::on_data_available (this=0x18ef3ff4f8, reader=...) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/examples/helloworld/publisher_modified.cpp:29"
#10 0x00007ff75c64a261 in dds::sub::detail::DataReader<HelloWorldData::Msg>::on_data_available (this=0x1826d38b5b0) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp:1380"
#11 0x00007ffe0df79e39 in callback_on_data_available (reader=2077064500, arg=0x1826d38ba30) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/src/org/eclipse/cyclonedds/core/ListenerDispatcher.cpp:177"
#12 0x00007ffe1a3b2c48 in da_or_dor_cb_invoke (rd=0x1826d38bdf0, lst=0x1826d38be90, status_and_mask=359929088, async=false) at C:/Users/intim/Documents/cyclonedds/cyclonedds-master/src/core/ddsc/src/dds_reader.c:215"
#13 0x00007ffe1a3b42db in dds_reader_invoke_cbs_for_pending_events (e=0x1826d38bdf0, status=5376) at C:/Users/intim/Documents/cyclonedds/cyclonedds-master/src/core/ddsc/src/dds_reader.c:450"
#14 0x00007ffe1a3c66ec in dds_entity_deriver_invoke_cbs_for_pending_events (e=0x1826d38bdf0, status=5376) at C:/Users/intim/Documents/cyclonedds/cyclonedds-master/src/core/ddsc/src/dds__types.h:261"
#15 0x00007ffe1a3c870f in dds_set_listener (entity=2077064500, listener=0x1826d38d300) at C:/Users/intim/Documents/cyclonedds/cyclonedds-master/src/core/ddsc/src/dds_entity.c:1083"
#16 0x00007ffe0df786b8 in org::eclipse::cyclonedds::core::EntityDelegate::listener_set (this=0x1826d38b5b0, _listener=0x18ef3ff4f8, mask=..., reset_on_invoke=true) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp:226"
#17 0x00007ff75c64aeb2 in dds::sub::detail::DataReader<HelloWorldData::Msg>::listener (this=0x1826d38b5b0, l=0x18ef3ff4f8, event_mask=...) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp:744"
#18 0x00007ff75c649782 in dds::sub::DataReader<HelloWorldData::Msg, dds::sub::detail::DataReader>::listener (this=0x18ef3ff500, listener=0x18ef3ff4f8, event_mask=...) at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp:420"
#19 0x00007ff75c641a2e in main () at C:/Users/intim/Documents/cyclonedds/cyclonedds-cxx-master/examples/helloworld/publisher_modified.cpp:78"

I've included a modified version of the helloWorldPublisher application which reproduces the problem. Note: I've only tested this on windows.

#include <cstdlib>
#include <iostream>
#include <chrono>
#include <thread>

/* Include the C++ DDS API. */
#include "dds/dds.hpp"

/* Include data type and specific traits to be used with the C++ DDS API. */
#include "HelloWorldData.hpp"

using namespace org::eclipse::cyclonedds;

class TakeListener : public dds::sub::NoOpDataReaderListener<HelloWorldData::Msg>
{
    void on_data_available(dds::sub::DataReader<HelloWorldData::Msg>& reader) override
    {
        reader.take();
        std::cout << "=== Took data." << std::endl;
    }
};

int main() {
    std::cout << "=== [Publisher] Create writer." << std::endl;

    /* First, a domain participant is needed.
     * Create one on the default domain. */
    dds::domain::DomainParticipant participant(domain::default_id());

    /* To publish something, a topic is needed. */
    dds::topic::Topic<HelloWorldData::Msg> topic(participant, "HelloWorldData_Msg");

    /* A writer also needs a publisher. */
    dds::pub::Publisher publisher(participant);

    auto data_writer_qos = publisher.default_datawriter_qos();
    data_writer_qos << dds::core::policy::Reliability::Reliable();
    data_writer_qos << dds::core::policy::Durability::TransientLocal();
    data_writer_qos << dds::core::policy::History::KeepLast(1);

    /* Now, the writer can be created to publish a HelloWorld message. */
    dds::pub::DataWriter<HelloWorldData::Msg> writer(publisher, topic, data_writer_qos);

    /* Create a message to write. */
    HelloWorldData::Msg msg(1, "Hello World");

    /* Write the message. */
    std::cout << "=== [Publisher] Write sample." << std::endl;
    writer.write(msg);

    std::cout << "=== [Publisher] Done." << std::endl;
    std::cout << "=== [Subscriber] Create reader." << std::endl;

    /* A reader also needs a subscriber. */
    dds::sub::Subscriber subscriber(participant);

    auto data_reader_qos = subscriber.default_datareader_qos();
    data_reader_qos << dds::core::policy::Reliability::Reliable();
    data_reader_qos << dds::core::policy::Durability::TransientLocal();
    data_reader_qos << dds::core::policy::History::KeepLast(1);

    /* Now, the reader can be created to subscribe to a HelloWorld message. */
    dds::sub::DataReader<HelloWorldData::Msg> reader(subscriber, topic, data_reader_qos);

    TakeListener listener;

    reader.listener(&listener, dds::core::status::StatusMask::data_available());

     std::cout << "=== Added listener." << std::endl;

    while (true)
    {
        // Wait for data.
    }

    return EXIT_SUCCESS;
}
eboasson commented 10 months ago

Those two do indeed look very much the same. When I looked at the first stack trace @upstreammuse kindly provided, I had an inkling of what might be going on, and then I read @TimMooreAu excellent and spot-on analysis. So thank you both for reporting it in such detail.

Bummer. I am usually pretty good at avoiding deadlocks, but I find it harder to spot them in the C++ binding than in the core, because things are so spread out (it really just follows the pattern set by the specification) and things are tied to this here and this there, and in C++ sometimes this is this and sometimes this is that. So I can accept missing this in a review, but I have more trouble accepting that I overlooked the absence of a test case for this ...

Anyway, it is not at all obvious to me that the lock is needed either for setting a listener and for take (and all the other similar operations). Somewhere, deep down in the implementation, one does need to do some synchronization, but the core library (i.e. cyclonedds) is doing that already and I think the C++ binding perhaps does it a bit too much again.

I suspect that if one were to simply delete the ScopedLock in listener() all will be well: that lock seems to exist because of the enable flag and the listener_callbacks object, but if I look through the sources the listener_callbacks object doesn't really seem to be used anywhere and so could perhaps be deleted, and enable is a bit of a special case anyway.

That said, in reviewing I overlooked a deadlock, so who's to say my first impressions are right this time? @e-hndrks you were kind of enough to figure out how to invoke the listeners for events that occurred prior to installing them and implementing it. Would you be kind enough to have a look at this and tell me what you make of it? Thanks!

e-hndrks commented 10 months ago

It seems this deadlock is caused by the different event models used by OpenSplice DDS (on which the original implementation of this C++ API was initially based) and Cyclone DDS, (for which the implementation of this C++ API was later modified). In OpenSplice DDS every event is by definition handled by a thread that runs asynchronously to the thread that sets the listener. However, in Cyclone DDS the thread that sets the listener might also be the thread that handles any pending events. The C++ API should have been modified for this purpose, but apparently the different event models were 'lost in translation'.

So how to solve this issue? It makes sense that the 'enabled' flag of the Entity needs to be evaluated within the context of the Entity's mutex, since a disabled Entity might result in in the corresponding Listener not being set on the C-API. This would mean that if somebody would enable a disabled Entity right after checking its enabled flag, no Listener would be set. However, this rationale doesn't seem to be followed in the implementation of the enable() method:

  1. The implementation of the enable() method itself doesn't lock any mutex when examining the enable() flag.
  2. When invoked, the enable() method will always activate the C listener, potentially resulting in it being set twice.

I will take some more time to look into this particular problem and hope to propose a fix before the end of this week.

eboasson commented 10 months ago

I thought about it a bit after I wrote the comment as well and I think it makes the most sense to share my thinking here: with a bit of luck it is useful to @e-hndrks and can save him some time 🙂

The implementation, of, e.g., take at https://github.com/eclipse-cyclonedds/cyclonedds-cxx/blob/0ddd39b06401305bdff481f394209e44a268be6d/src/ddscxx/src/org/eclipse/cyclonedds/sub/AnyDataReaderDelegate.cpp#L357-L374 is quite conservative with locking. The core library protects itself rather carefully, and in particular the call to dds_take_with_collector will immediately error out if the handle has already been closed.

It looks to me like this->check() is https://github.com/eclipse-cyclonedds/cyclonedds-cxx/blob/0ddd39b06401305bdff481f394209e44a268be6d/src/ddscxx/src/org/eclipse/cyclonedds/core/ObjectDelegate.cpp#L26-L32 and that it does nothing other than a looking at a closed flag. On the face of it then, it seems therefore that the call to check() could be removed without doing much harm.

The closed flag basically gets set here: https://github.com/eclipse-cyclonedds/cyclonedds-cxx/blob/0ddd39b06401305bdff481f394209e44a268be6d/src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp#L704-L726 which is ever so slightly earlier than the underlying data reader gets deleted. I wonder if that makes any difference for take, it only matters if some state in the C++ binding that take depends on becomes invalid between that close and calling dds_delete to actually delete the data reader.

As far as I can see now, the only other thing at play is the SampleHolder https://github.com/eclipse-cyclonedds/cyclonedds-cxx/blob/0ddd39b06401305bdff481f394209e44a268be6d/src/ddscxx/include/org/eclipse/cyclonedds/sub/AnyDataReaderDelegate.hpp#L62-L72 and that doesn't seem to have any dependency on the contents of the DataReader object.

So my second impression is that removing the lock and the check in take is quite safe. Otherwise, I would expect splitting the mutex and protecting closed and/or enable and listener_callbacks with a separate mutex should also work.

e-hndrks commented 10 months ago

I think I agree with the analysis of @eboasson . Again, the locking mechanisms used by this C++ API originally came from OpenSplice, and were needed to guard the integrity of the User Layer pointer contained in the C++ object, which otherwise might be dangling when another thread closed the Entity. However, in Cyclone DDS the C++ object doesn't wrap around a pointer but around a handle, whose validity can be separately verified in the handle server. That means we could get rid of the entire 'closed' flag on the C++ object level with respect to the validity of the wrapped C-entity.

What is left is the validity checking of the 'enable' flag, which typically should be done inside the underlying C entity. However, the enable functionality hasn't been implemented in the underlying C-API, so it seems a bit messy to have it implemented in the C++ layer on top. I would propose to take the whole 'enable' semantics out of the C++ API so that it behaves identical to C. When it is time to introduce the 'enable' semantics in C, then C++ would automatically benefit from it as well since it is built on top of the C API.

Last but not least, the C++ Entity wrapper keeps track of the underlying C-Listener object, which might also need to be protected using a lock on the C++ level. In that particular case I wonder if that is really needed: couldn't we just register the C listener on the C entity, and ask the underlying C entity for its listener when we need it? That would remove the extra duplication we now have on the C++ API, and would remove the need for an Entity lock in C++ altogether.

Would you agree with this analysis @eboasson? If you do, I can start working on removing these items from the C++ API.

eboasson commented 10 months ago

Thanks @e-hndrks ! Yep, I agree 🙂

The Cyclone core copies all the listener details into its own memory during entity creation and dds_set_listener, and dds_get_listener copies them out again. So yes, I think

couldn't we just register the C listener on the C entity, and ask the underlying C entity for its listener when we need it?

should work.

There is a small caveat, I just realised it when I read your comment: dds_get_listener will return the listener functions set directly on the entity, but also those "inherited" from the parent. I don't think the API allows you to distinguish between the two cases. It does track this internally inside the dds_listener object, but if I am reading the relevant functions correctly, it would lose that information if you'd do (in pseudo code) set_listener(E,get_listener(E)).

That's an oversight in the API and/or a bug. There are multiple ways to look at this, and I am not sure yet where I stand. One view is that dds_get_listener should not return the inherited ones (and then all will be well), another that it should return everything like now and an operation should exist that allows one to find out how where a non-null listener function came from. Furthermore the behaviour of set(get) should be documented in detail or it should do what one'd expect, namely, to have no observable effect.

So, first question: which is the more reasonable view?

Second question: is this is an issue for eliminating the C++ listener object? If it is, let's fix the core; if it isn't let's just create an issue for this detail in the core library and deal with it later.

TimMooreAu commented 9 months ago

@e-hndrks and @eboasson I just wanted to follow up on this and check whether you have made a decision on a way forward? Thanks

e-hndrks commented 9 months ago

Hi @TimMooreAu, I am working on a fix along the lines that @eboasson described, but that is quite a big change to make. If you just need a temporary fix that bypasses the problem for now, you could use the following code-change:

diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp
index 284d4f5..f683d93 100644
--- a/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp
+++ b/src/ddscxx/src/org/eclipse/cyclonedds/core/EntityDelegate.cpp
@@ -222,8 +222,9 @@ org::eclipse::cyclonedds::core::EntityDelegate::listener_set(
     // If entity enabled: set listener on ddsc entity
     if (this->enabled_)
     {
-        dds_return_t ret;
-        ret = dds_set_listener(this->ddsc_entity, callbacks);
+        this->unlock();
+        dds_return_t ret = dds_set_listener(this->ddsc_entity, callbacks);\
+        this->lock();
         ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ret, "Setting listener failed.");
     }

I have tried this fix myself, and it seems to solve your problem provisionally.