ros2 / rmw_fastrtps

Implementation of the ROS Middleware (rmw) Interface using eProsima's Fast RTPS.
Apache License 2.0
157 stars 117 forks source link

Subscriber does not attain/maintain ownership of received loaned messages #679

Open verderog opened 1 year ago

verderog commented 1 year ago

Bug report

Required Info:

Steps to reproduce issue

I'm using a fairly straightforward publisher/subscriber node setup. Publisher publishers to topic, subscriber subscribes to that same topic with a standard callback. Publisher and subscriber nodes are invoked as separate processes.

The issue occurs such that if a long enough delay is induced on the subscriber's callback as it's processing an incoming loaned message, the publisher is not prevented from updating the message contents. It does not seem that the subscriber attains or maintains ownership of incoming loaned messages.

Publisher creation:

loan_publisher_ = this->create_publisher<test_interfaces::msg::Testdata>("pub_topic", queue_depth_);

Publisher publishing:

rclcpp::LoanedMessage<test_interfaces::msg::Testdata> loaned_msg = loan_publisher_->borrow_loaned_message();
(...loaned message processing here...)
loan_publisher_->publish(std::move(loaned_msg));

Subscriber creation:

subscription_ = this->create_subscription<test_interfaces::msg::Testdata>("pub_topic", 10,std::bind(&SubNode::topic_callback, this, _1));

Subscriber callback processing:

void topic_callback(const test_interfaces::msg::Testdata::SharedPtr msg) {
(...message processing here...)
}

Env config:

export RMW_FASTRTPS_USE_QOS_FROM_XML=1
export RMW_IMPLEMENTATION=rmw_fastrtps_cpp
export FASTRTPS_DEFAULT_PROFILES_FILE=`pwd`/enable-loaned-msgs.xml

XML config:

<?xml version="1.0" encoding="UTF-8" ?>
<profiles xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">

  <!-- Default publisher profile -->
  <data_writer profile_name="default publisher profile" is_default_profile="true">
    <qos>
      <publishMode>
        <kind>ASYNCHRONOUS</kind>
      </publishMode>
      <!-- https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/core/policy/eprosimaExtensions.html#datasharingkind -->
      <data_sharing>
        <kind>AUTOMATIC</kind>
      </data_sharing>
    </qos>
    <historyMemoryPolicy>PREALLOCATED_WITH_REALLOC</historyMemoryPolicy>
  </data_writer>

  <data_reader profile_name="default subscription profile" is_default_profile="true">
    <qos>
          <!-- https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/core/policy/eprosimaExtensions.html#datasharingkind -->
      <data_sharing>
        <kind>AUTOMATIC</kind>
      </data_sharing>
    </qos>
    <historyMemoryPolicy>PREALLOCATED_WITH_REALLOC</historyMemoryPolicy>
  </data_reader>
</profiles>

Expected behavior

Subscriber operates on message data that does not change.

Actual behavior

Publisher is not prevented from updating message while it is being processed by one or more subscribers.

Additional information

See points 2 & 3 in this unanswered thread: https://answers.ros.org/question/401385/loaned-messages-and-zero-copy/

I'm not sure if I'm missing something in terms of loaned message life cycle (ie. "That's just the way it is"), operational behavior, or if there is a mechanism to prevent this sort of issue from occurring.

On a somewhat related note, I've also observed segmentation faults if the publisher node process terminates while a subscribe node process is accessing loaned message data. That might warrant another issue.

If it helps, I can post a simplified test that demonstrates the problem.


jsan-rt commented 1 year ago

Hi @verderog,

What you are seeing is the expected behavior when using Data Sharing with Loans with the default QoS configuration, in particular KEEP_LAST_HISTORY_QOS or BEST_EFFORT_RELIABILITY_QOS. Both DataWriter and DataReader share the exact same history so in the event that the reading rate is significantly slower than the publication rate it may lead to sample overwriting.

You can read a bit more on the issue including some ways to prevent this in this section of Fast DDS documentation.

verderog commented 1 year ago

@jsantiago-eProsima

Thank you for the reply! I've read (and re-read) the section you linked, along with the ROS2 info on QoS. Even so, I still don't quite have a conceptual understanding of how loaned messaging should be working. To add to my confusion, I know I'm interfacing with FastRTPS through ROS2 and not invoking DataWriter/DataReader APIs directly -- so I'm one layer removed from the operations described by the FastRTPS documentaiton.

If I describe my line of thinking, hopefully you can clarify how things really are intended to work and where I may be going off the rails. In my mind, I understand that the middleware creates a pool of loaned messages (ie. samples). The publisher can come along and execute borrow_loaned_message. This moves one of the samples from the middleware to the publisher, giving the publisher ownership. The publisher then updates the data of the sample and publishes it, returning ownership back to the middleware. I think this understanding is correct.

At this point, things are little bit more muddled for me. The middleware in some way will notify all subscribers that there is a sample with updated data (ie. a sample has been published.) I doubt ownership is transferred away from the middleware as you most likely cannot have multiple subscribers "own" the sample. If I understand the FastRTPS documentation correctly, this sample exists in the history of both the middleware and the subscribers as a shared memory location. The callback function for the subscribers is invoked and references this shared memory location. The invocation of the callback is enough to acknowledge that the sample has been processed, which makes that sample fair game for the middleware to reuse. This will happen even if the subscriber hasn't technically completed its processing of the sample. Is this correct?

Is there a way for a subscriber to determine if a loaned message has been updated before it has completed processing it? My current approach is for each sample to include a sequence number as part of its data. This value is uniquely set by the publisher before publishing. If the subscriber observes that the sequence number has not changed after it is done processing the data from the sample, it assumes the rest of the sample also did not change. This feels like a naive approach, but it seems to work as a first-pass.

I don't think it is possible to setup a QoS profile for data sharing to prevent the publisher and multiple subscribers from trying to operate on data at the same time. Unless I somehow bypass ROS2 and interface with the FastRTPS DataWriter/DataReader directly, I'm not sure how I can guarantee these concurrent access operations won't happen.

Sorry for the length of my reply. I do appreciate your help!

fujitatomoya commented 1 year ago

This will happen even if the subscriber hasn't technically completed its processing of the sample. Is this correct?

I believe, as far as i can see from the doc, DataWriter does not even get acknowledged by DataReaders, that means DataWriter can do reuse that sample whenever necessary.

Is there a way for a subscriber to determine if a loaned message has been updated before it has completed processing it?

I would like to ask the same question.

Even with QoS KEEP_LAST_HISTORY_QOS or BEST_EFFORT_RELIABILITY_QOS, this would lead the DataReaders to read the unknown data could be corrupted. And I would not think that is good behavior for user experience, unless it notifies the user application in anyways, like exception with Data has been overwritten.

I can see the we can change the QoS into RELIABLE so that DataWriter will be acknowledged by DataReaders that the sample can be reused. This behavior makes sense to me, to guarantee that DataReaders read the delivered message, and if necessary it blocks DataWriter to publish the new messages because there is no freed samples.

verderog commented 1 year ago

As an experiment, I updated my XML with the following changes to the reliability QoS settings. I observed no change in behavior with my sub. With a 15 second delay in the sub's topic callback, it detected shared data changing so it does not appear that the pub/DataWriter was blocked:

<?xml version="1.0" encoding="UTF-8" ?>
<profiles xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">

  <!-- Default publisher profile -->
  <data_writer profile_name="default publisher profile" is_default_profile="true">
    <qos>
      <publishMode>
        <kind>ASYNCHRONOUS</kind>
      </publishMode>
      <!-- https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/core/policy/eprosimaExtensions.html#datasharingkind -->
      <data_sharing>
        <kind>AUTOMATIC</kind>
      </data_sharing>
      <reliability>
          <kind>RELIABLE</kind>
          <max_blocking_time>
               <sec>20</sec>
          </max_blocking_time>          
      </reliability>      
    </qos>
    <historyMemoryPolicy>PREALLOCATED_WITH_REALLOC</historyMemoryPolicy>
  </data_writer>

  <data_reader profile_name="default subscription profile" is_default_profile="true">
    <qos>
      <!-- https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/core/policy/eprosimaExtensions.html#datasharingkind -->
        <reliability>
            <kind>RELIABLE</kind>
        </reliability>

      <data_sharing>
        <kind>AUTOMATIC</kind>
      </data_sharing>
    </qos>
    <historyMemoryPolicy>PREALLOCATED_WITH_REALLOC</historyMemoryPolicy>
  </data_reader>
</profiles>
fujitatomoya commented 1 year ago

CC: @iuhilnehc-ynos @Barry-Xu-2018

Barry-Xu-2018 commented 1 year ago

According to previous design, there are 2 policies while overflow happened. User can set it. But there is no block policy.

https://github.com/eProsima/Fast-DDS/blame/89362996062053c5578b2c67b320c544d9ed4f74/doc/design/shared-memory-transport/interprocess_shared_mem.md#L83-L84

At this commit, it removed 2 policies.

https://github.com/eProsima/Fast-DDS/pull/1129/files#diff-8d03ef086041a6e0b5c179538aed800631d4fb3aa556864551aab023ba585aa1

Now it adopted 'overwrite' way may consider the performance and complexity.

About QoS RELIABLE, we think it only makes sure subscription received descr. The contents of shared memory pointed by descr can be overwritten.

image

We think it is important to let users know when an 'overwrite' occurs, so that they can adjust the size of shared memory to a suitable value.
Besides, when an 'overwrite' occurs, there may be a warning log issued in fastdds, but by default, ROS2 user cannot see the log contents outputted by fastdds.

MiguelCompany commented 1 year ago

@verderog In the case of ROS 2, there are certain QoS policies that can never be configured by XML, and where the settings from rclcpp will always be honored.

So to ensure a RELIABLE, KEEP_ALL communication, you need to create the publisher and subscriber this way:

auto pub_qos = rclcpp::QoS(queue_depth_).keep_all().reliable();
loan_publisher_ = this->create_publisher<test_interfaces::msg::Testdata>("pub_topic", pub_qos);

...

auto sub_qos = rclcpp::QoS(10).keep_all().reliable();
subscription_ = this->create_subscription<test_interfaces::msg::Testdata>("pub_topic", sub_qos, std::bind(&SubNode::topic_callback, this, _1));
MiguelCompany commented 1 year ago

@Barry-Xu-2018 The shared memory transport is completely unrelated to this, we are talking about data-sharing here.

Most of the relevant code is here, though some relevant parts are inside StatefulWriter.cpp and StatefulReader.cpp

Barry-Xu-2018 commented 1 year ago

@MiguelCompany Thank you for correcting me.

fujitatomoya commented 1 year ago

In the case of ROS 2, there are certain QoS policies that can never be configured by XML, and where the settings from rclcpp will always be honored.

@MiguelCompany sorry that is correct, missed it. thanks for pointing that out.

Even with QoS KEEP_LAST_HISTORY_QOS or BEST_EFFORT_RELIABILITY_QOS, this would lead the DataReaders to read the unknown data could be corrupted. And I would not think that is good behavior for user experience, unless it notifies the user application in anyways, like exception with Data has been overwritten.

this behavior is expected behavior currently, right? I do not think we can rely on this behavior? cz data would be corrupted w/o notification? just checking my understanding is correct.

I can see the we can change the QoS into RELIABLE so that DataWriter will be acknowledged by DataReaders that the sample can be reused.

This behavior makes sense to me, to guarantee that DataReaders read the delivered message, and if necessary it blocks DataWriter to publish the new messages because there is no freed samples.

verderog commented 1 year ago

@MiguelCompany Thanks for the help! I implemented your recommendation and am faced with a new issue relating to segment size:

2023-04-18 14:58:25.477 [DATASHARING_PAYLOADPOOL Error] Failed to create segment fast_datasharing_01.0f.70.b7.6c.00.26.10.01.00.00.00_0.0.12.3: Segment size is too large: 5245288848 (max is 4294967295). Please reduce the maximum size of the history -> Function init_shared_segment

I've got an element in the message data that is 8MB in size (a 1920x1080x4 image buffer) that seems to be the source of this problem. If I reduce this size down to 768KB, then I'm able to run the publisher without the above error. Is there a method available to adjust the Fast-RTPS segment size from ROS2?

MiguelCompany commented 1 year ago

@verderog Having a low max_samples value here should do the trick

verderog commented 1 year ago

@MiguelCompany Thanks for the quick response!

I've updated my XML configuration with max_samples set to 20 and I'm seeing the same behavior that I initially reported.

To recap my test configuration to "force" the issue:

I had it in mind that the QoS updates you recommended would force the pub to block at some point due to running out of unallocated samples but that doesn't seem to be the case.

So, either I'm completely misunderstanding something or the behavior I'm looking for, unchanging data using a shared memory approach, simply isn't possible.