ros2 / rmw_fastrtps

Implementation of the ROS Middleware (rmw) Interface using eProsima's Fast RTPS.
Apache License 2.0
157 stars 117 forks source link

Reused shared memory with `PREALLOCATED_WITH_REALLOC` #783

Open mauropasse opened 1 month ago

mauropasse commented 1 month ago

On the PR https://github.com/ros2/rclcpp/pull/2624 I provide a fix to allow the user to make copies of loaned messages, keeping them beyond the subscription callback scope. With the fix, the loaned message is returned to the DDS when the user's copy of the message goes out of scope.

In the PR I add a unit test to verify that memory is not reused until all entities have returned the loan. The tests fully pass when history memory policy is set as DYNAMIC_REUSABLE, but fails when set as PREALLOCATED_WITH_REALLOC:

[  FAILED  ] 2 tests, listed below:
[  FAILED  ] TestSharedMemory.user_is_last_loan_owner
[  FAILED  ] TestSharedMemory.sub_is_last_loan_owner

The failures mean that a user is provided with loaned memory which has been previously provided, and is still in use:

While a temporary solution would be to only use DYNAMIC_REUSABLE on shared memory, this mode has the issue that a undesired copy is performed on a multi-process system, as I describe in https://github.com/ros2/rmw_fastrtps/issues/782

So in short:

PREALLOCATED_WITH_REALLOC: Issues shared memory still in use
DYNAMIC_REUSABLE: A copy of the message happens on a multi-process system

Do you think this is a bug, or the use of PREALLOCATED_WITH_REALLOC inherently breaks proper shared memory management?

mauropasse commented 1 month ago

These failures can be explained by the documentation in eProsima's datasharing-delivery-constraints:

Using Data-sharing mechanism, the DataWriter’s history is shared with the DataReaders.
This means that the effective HistoryQos depth on the DataReader is, at most, 
the Datawriter’s HistoryQos depth.

To avoid confusions, set the DataReaders’ history depth to a value equal or less than the DataWriter’s.

In the failure [ FAILED ] TestSharedMemory.sub_is_last_loan_owner, the subscription has set a QoS depth larger than the publisher depth, so the failure seems kind of expected.

My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event: [WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.

The goal would be for the user for somehow to know that if he wants shared memory to safely handle memory (i.e. its stored messages won't silently be overridden):

Maybe this makes sense to just be mentioned on the ROS2 documentation about shared memory & loaned messages.

mauropasse commented 1 month ago

Currently the user can still ask for Loaned Messages even if all the shared memory has been exhausted, like when the previous loaned messages hasn't been returned (either because the user has copies, or subscriptions has them in its buffers).

In this case, the user is provided with memory still in use, which will silently override user/subscription stored messages.

I think the right behavior would be for the publisher to throw when the user requests loans and memory has been exhausted. What do you think?

fujitatomoya commented 1 month ago

My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event: [WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.

IMO, sounds reasonable if they are doing data-sharing. and this is probably request for Fast-DDS, and we (ROS 2) can catch the QoS incompatibility event from rmw implementation?

In this case, the user is provided with memory still in use, which will silently override user/subscription stored messages. I think the right behavior would be for the publisher to throw when the user requests loans and memory has been exhausted.

IMO this depends on the application requirement.

CC: @MiguelCompany @wjwwood @clalancette @alsora

MiguelCompany commented 1 month ago

@mauropasse Currently, the only way to make the publisher block till the shared payload is not being used is using RELIABLE, and KEEP_ALL.

You can also increase extra_samples in ResourceLimitsQosPolicy to make the pool bigger (for instance, having extra_samples = depth would make the pool twice the depth).

One thing that could be done is adding a warning when a sample is marked for reuse without it being acknowledged. There is this callback in the DataWriterListener that could be of use.

My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event: [WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.

The problem with this is that HistoryQos is not transmitted in discovery.

mauropasse commented 1 month ago

Great information @MiguelCompany. I'll explore the options!

mauropasse commented 1 month ago

@MiguelCompany I tested these scenarios based on your suggestions:

If the subscription is not spinning but it receives the loaned messages, I guess the ACK is still sent in DDS? So the on_unacknowledged_sample_removed wouldn't be useful in this case? Thanks!

MiguelCompany commented 1 month ago
  • KEEP_ALL on publisher: Shared memory isn't reused, even if loans were returned by the sub/user (since the publisher still keeps the loans). No memory corruption or duplicates detected, so it serves the purpose (but might not be the most performant option due possible dynamic allocations?)

No dynamic allocations here. The only possible issue is that, since the publisher is blocked until the oldest sample is acknowledged (or reliability.max_blocking_time elapses), it could hinder the publication rate.

  • I tested RELIABLE with on_unacknowledged_sample_removed: It didn't trigger, as the DDS sub likely sent the ACK back to pub even when loans weren’t returned? (subscription not spinning, storing messages). I overrided the API on custom_publisher_info.hpp: CustomDataWriterListener adding a warning, but it didn't show anything.

That is awkward. Was the reader also RELIABLE? Samples are automatically acknowledged for BEST_EFFORT readers. Just in case, could you check again with the changes on this commit? I just added an error log when the writer detects the situation of a sample being removed without acknowledgement.

mauropasse commented 1 month ago

That is awkward. Was the reader also RELIABLE?

Yes. So this is the scenario just to clarify:

Single process: [Pub / Sub_A / Sub_B] = RELIABLE
1. Publisher (depth=1) publishes loaned messages
2. Subscription_A  (depth=10) doesn't spin, stores messages
3. Subscription_B spin, user stores 10 messages

After publishing the 2nd message, the publisher is provided with same memory 
used in the 1st message, despite loans not being returned. This is where we'd like to see the warning. 

When is usually the ACK sent by the subscription? When executed or when message is received in DataReader? What is the relation with this callback, and the fact that asking a loan will provide memory still in use?

The only case in which I manage to see this callback in action (with also the logs you added in the commit) is setting:

Pub: best effort
Sub: reliable

[WARN] [1728658509.233333338] [pub_sub_node]: New publisher discovered on topic '/topic',
   offering incompatible QoS. No messages will be sent to it. Last incompatible policy: RELIABILITY_QOS_POLICY

[INFO] [1728658509.243104727] [pub_sub_node]: Publishing: '1'
2024-10-11 14:55:09.243 [RTPS_HISTORY Error] Unacknowledged change removed: test_msgs::msg::dds_::BasicTypes_ -> Function prepare_change
[WARN] [1728658509.243487964] [rmw_fastrtps_shared_cpp]: CustomPublisherInfo: on_unacknowledged_sample_removed!!
memory reused: 0x56141ada4bc0

[INFO] [1728658509.253650552] [pub_sub_node]: Publishing: '2'
[WARN] [1728658509.253678907] [rmw_fastrtps_shared_cpp]: CustomPublisherInfo: on_unacknowledged_sample_removed!!
2024-10-11 14:55:09.253 [RTPS_HISTORY Error] Unacknowledged change removed: test_msgs::msg::dds_::BasicTypes_ -> Function prepare_change
memory reused: 0x56141ada4b50
mauropasse commented 1 month ago

Since my test is single process, the send_datasharing_ack() is not called, but I'm unsure if is the same ACK sender that causes the callback on_unacknowledged_sample_removed() to not be called.

mauropasse commented 1 month ago

I expanded the test to check multi-process behavior (pub/sub in different processes):

So, in most cases, the on_unacknowledged_sample_removed() callback doesn't serve to notify the user about memory reuse.

A different approach to verify if memory is still in use could be:

fujitatomoya commented 1 month ago

@mauropasse thank you for checking this, for us this is really interesting topic!

Subscription spinning (but user keeps messages, not returning loans): The callback is not invoked, even if memory is reused, likely because the subscription sent ACK to the publisher.

this is because that ACK is sent to the corresponding publisher when the message is taken on the subscription? right?

https://github.com/ros2/rmw_fastrtps/blob/5cb7665847bc9debfee2e8077c324ff4cc403ea0/rmw_fastrtps_shared_cpp/src/rmw_take.cpp#L98

for me, this looks like okay by design of on_unacknowledged_sample_removed? because subscription sent the ACK for that message. i mean that on_unacknowledged_sample_removed does not tell the data is valid or not...

i think we need the is_valid API in the end, so that application can check the data integrity right before accessing the data...

Just FYI, with previous comment on https://github.com/ros2/rmw_fastrtps/issues/783#issuecomment-2407607674

Pub: best effort Sub: reliable

I do not think this is gonna communicate at all...

offering incompatible QoS. No messages will be sent to it. Last incompatible policy: RELIABILITY_QOS_POLICY

DDS detected the incompatible QoS, and no messages are delivered to this concerned subscription. because DDS cannot guarantee RELIABLE durability on this subscription.

MiguelCompany commented 1 month ago

@mauropasse @fujitatomoya This is in fact getting interesting...

this is because that ACK is sent to the corresponding publisher when the message is taken on the subscription? right?

@fujitatomoya you nailed it! It seems Fast DDS is acknowledging the sample when taking it.

So the following table is a summary on when samples are acknowledged

Same process Inter-process without data-sharing Inter-process with data-sharing
Sample acknowledged... ... when published ... when added to history ... on take

A different approach to verify if memory is still in use could be:

  • If the publisher knows how many same-host subscriptions exist, it could infer if memory is still in use:

    • The loan must be returned X times (where X = N subscriptions + 1 for the transient_local publisher).

Could logic be implemented to track this? What do you think @MiguelCompany

I think it would be better to extend the current mechanism and make it possible to acknowledge the sample when the loan is returned. This way we could use on_unacknowledged_sample_removed to warn the user.

Either that or expose is_valid API as @fujitatomoya suggested

i think we need the is_valid API in the end, so that application can check the data integrity right before accessing the data...

MiguelCompany commented 1 month ago

I thought of another possibility. The user could be warned when an item is added to the LoanManager, and the address of the sample already exists there:

https://github.com/ros2/rmw_fastrtps/blob/9d2150ffc1b549d0adc71971b7f350e325b61340/rmw_fastrtps_shared_cpp/src/rmw_take.cpp#L534-L539

fujitatomoya commented 1 month ago

I think it would be better to extend the current mechanism and make it possible to acknowledge the sample when the loan is returned. This way we could use on_unacknowledged_sample_removed to warn the user.

as Fast-DDS, i think this sounds more robust and reliable? is there any side effect for this? because doing this means, the DataReader takes more time to send the ACK to the DataWriter, i am not sure if DataWriter's behavior could be different from this...

Either that or expose is_valid API as @fujitatomoya suggested

IMO, either way (above or below) we take, is_valid API is useful for user application to check the data validity right before accessing it. warning and callback is also useful, but user application needs to sync the state between accessor and warning event in the application.

I thought of another possibility. The user could be warned when an item is added to the LoanManager, and the address of the sample already exists there

this sounds reasonable in ROS 2 RMW implementation. if that is overwriting the same VA (unique_ptr) that already exists in the items list, we can print the warning. and if we can have the user callback here, we can notify the user via this callback.

mauropasse commented 1 month ago

I've been testing the approaches using add_item and on_unacknowledged_sample_removed to warn the user about its messages being overridden, and they're useful. So we have two situations:

So we have then,

In both cases, the warnings happen after the messages has been overriden. Ideally it would be nice for the warning to happen in all processes, and when the loaned is borrowed (before overrides), but these warnings I tested above seem a step in the right direction!

--- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp
+++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp

+  void on_unacknowledged_sample_removed(
+      eprosima::fastdds::dds::DataWriter* datawriter,
+     const eprosima::fastdds::dds::InstanceHandle_t& handle) override
+  {
+    RCUTILS_LOG_WARN_NAMED(
+      "rmw_fastrtps_shared_cpp",
+      "A shared message held by a subscription has been overriden.");
+  }
+
 private:
   RMWPublisherEvent * publisher_event_;
 };

--- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp
+++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp
   void add_item(std::unique_ptr<Item> item)
   {
     std::lock_guard<std::mutex> guard(mtx);
+    // Check if the new item already exists in the list
+    for (const auto& existing_item : items) {
+      if (existing_item->data_seq.buffer()[0] == item->data_seq.buffer()[0]) {
+        RCUTILS_LOG_WARN_NAMED(
+          "rmw_fastrtps_shared_cpp",
+          "Subscription recieved a message still held by the user (which was overridden)"); }}

is_valid would also be useful. Any suggestions for its implementation?

fujitatomoya commented 1 month ago

@mauropasse no major objections, i think this is better for user application. one concern is that when we are using the loaned messages, usually what we care most is performance. adding the extra checking to crawl through the existing_item list would be not ideal for the performance? especially we have a large depth in the HistoryCache?

mauropasse commented 1 month ago

@fujitatomoya I think it wouldn't hurt performances much.

Only in an (uncommon?) situation, there may be some CPU usage when a non-spinning subscription (with many stored messages) starts spinning, but the user doesn't discard the messages.

For each execute_subscription, an item is added to a list and compared with previous items (which would be zero, since the subscription wasn't spinning before). If the user discards the message in the callback, the item is removed from the list. However, if the user keeps a copy of the message and doesn't return the loan, the item remains stored.

The overhead comes then from iterating over the messages that the user has kept outside of the subscription callback, when a new message is processed. So I think for most situations, it would be iterating over an empty list?

On other topic, what it worries me is that we still don't have a warning for single-process case where a subscription keeps duplicated an overridden messages. Suppose, subscription_depth = 100 / publisher_depth = 1 -> Subscription has 100 overridden messages, all the same!

fujitatomoya commented 1 month ago

@mauropasse yeah probably the overhead can be ignored mostly when the application returns the memory to the middleware.

On other topic, what it worries me is that we still don't have a warning for single-process case where a subscription keeps duplicated an overridden messages. Suppose, subscription_depth = 100 / publisher_depth = 1 -> Subscription has 100 overridden messages, all the same!

is this really configurable with Fast-DDS? i mean History Object is coupled between DataWriter and DataReader, that means i think subscription depth is also managed with 1 in this case?