eclipse-cyclonedds / cyclonedds-cxx

Other
96 stars 75 forks source link

Improving the reliability of message delivery to subscribers during initialization #431

Closed adam-lee closed 1 year ago

adam-lee commented 1 year ago

How can I address the issue of my subscriber not receiving DDS messages from my publisher upon initialization, which seems to occur randomly and may be caused by a race condition?

My publisher currently does not wait for the subscriber to join before sending the first message. This approach was intentional to avoid blocking for each individual subscriber since there may be multiple subscribers on the network.

With this constraint, here are a few options I am considering:

  1. Callback on publisher: Modify the publisher to receive a callback when a subscriber comes online. This way, the publisher can wait for the callback to re-transmit the data.

  2. Polling for a new subscriber: Allow the publisher to periodically poll for new subscribers. By actively checking for new subscribers, the publisher can detect when a subscriber joins the network.

Both approach would be followed by a re-transmission of the latest data to ensure the subscriber receives it.

Hoping there is a CycloneDDS mechanism that I can use to achieve either of the options I'm considering above. If my approaches are completely off, please feel free to let me know :).

Any other suggestions and comments are also welcome. Ty!

adam-lee commented 1 year ago

Just realized on_publication_matched() callback is available. ~This should be enough~.

Now I have to figure out how to prepare the data 2nd time (once it's been consumed by the 1st publish).

eboasson commented 1 year ago

Hi @adam-lee, perhaps the "transient-local" setting for the durability kind does what you want: it keeps (some of) the data that was published specifically so subscribers that show up after the data was published still get it.

There are a few things to take into consideration, of course:

As a practical detail, the N above is set not by the writer's "history" QoS setting, but by the writer's "durability service" QoS setting (and then the history part of it, obviously).

adam-lee commented 1 year ago

Thanks for the pointers.

We added the following to the policy and it's working a lot better:

dds::core::policy::Reliability::Reliable()