ros2 / rmw_cyclonedds

ROS 2 RMW layer for Eclipse Cyclone DDS
Apache License 2.0
112 stars 91 forks source link

Large pointcloud pubsub is unstable when there are many subscribers. #292

Open wep21 opened 3 years ago

wep21 commented 3 years ago

How to reproduce

Expected behavior

The output of topic hz is 10 Hz.

Actual behavior

The output of topic hz is decreasing less and less when the number of subscriber is more than 10.

cyclonedds setting

 <?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS xmlns="https://cdds.io/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://cdds.io/config https://raw.githubusercontent.com/eclipse-cyclonedds/cyclonedds/master/etc/cyclonedds.xsd">
    <Domain id="any">
        <General>
            <NetworkInterfaceAddress>auto</NetworkInterfaceAddress>
            <AllowMulticast>default</AllowMulticast>
            <MaxMessageSize>65500B</MaxMessageSize>
            <FragmentSize>4000B</FragmentSize>
        </General>
        <Internal>
            <HeartbeatInterval max="100ms">100ms</HeartbeatInterval>
            <MinimumSocketReceiveBufferSize>30MB</MinimumSocketReceiveBufferSize>
            <Watermarks>
                <WhcHigh>500kB</WhcHigh>
            </Watermarks>
        </Internal>
        <Tracing>
            <Verbosity>config</Verbosity>
            <OutputFile>stdout</OutputFile>
        </Tracing>
    </Domain>
</CycloneDDS>

kernel parameter

net.core.rmem_default = 30971520 net.core.rmem_max = 30971520

Related issue

eboasson commented 3 years ago

The issue is very simple: it spends ~90% of its time in copying the point cloud data:

flamegraph

One copy (almost an insignificant amount of time) is made while serializing the point cloud in the initial publication (that's the "tower" at the right edge). The bulk of the CPU time is spent by the relay nodes.

The first copy is when the relay node's subscription callback is invoked with a serialized sample. It gets this data by (somehow, from somewhere in the bowels of rclpp or rcl) calling rmw_take_serialized_message. The ROS 2 interface for serialized messages requires that a copy be made, and so this means the data is copied once for each reader. That happens at https://github.com/ros2/rmw_cyclonedds/blob/b18a10260fa764e1deca2ff8962c75d32dcae530/rmw_cyclonedds_cpp/src/rmw_node.cpp#L2762 (which is really just a memcpy: https://github.com/ros2/rmw_cyclonedds/blob/b18a10260fa764e1deca2ff8962c75d32dcae530/rmw_cyclonedds_cpp/src/serdata.cpp#L243

Re-publishing the data means you eventually end up in rmw_publish_serialized_message. Again, the interfaces require that a copy be made, and so it gets copied again. This happens at https://github.com/ros2/rmw_cyclonedds/blob/b18a10260fa764e1deca2ff8962c75d32dcae530/rmw_cyclonedds_cpp/src/serdata.cpp#L169 via https://github.com/ros2/rmw_cyclonedds/blob/b18a10260fa764e1deca2ff8962c75d32dcae530/rmw_cyclonedds_cpp/src/serdata.cpp#L214

Without changing the ROS interfaces to support loaned samples for taking/publishing serialized messages, there is nothing that can be done about this, unless you're willing to do this with a DDS application. You can create the same topic, subscription/data reader and publisher/data writer, and then do

while(1) {
  struct ddsi_serdata *d;
  dds_sample_info_t si;
  if (dds_takecdr(reader, &d, 1, &si, DDS_ANY_STATE) == 1)
    dds_forwardcdr(writer, d);
}

that way, instead of copying the data, it'll just shuffle some pointers around.

I've heard of some route to get the DDS entity from a ROS 2 object, so you could probably also do a mix, setting up the subscription and the publisher using the ROS 2 API and then doing the forwarding like above.

wep21 commented 3 years ago

Thank you for your detailed explanation. I replaced serialized message relay nodes with point cloud message relay nodes and then I confirmed topic latency was reduced. I'm not familiar with dds implementation, so I try to replace serialized pub sub with normal one temporarily. Also, is it effective to use loaned normal messages or use intra process comms in cyclonedds? https://github.com/ros2/demos/blob/master/demo_nodes_cpp/src/topics/talker_loaned_message.cpp https://github.com/ros2/rclcpp/blob/35c89c8afcac89a43872f0bf5b763ff324c82b05/rclcpp/include/rclcpp/node_options.hpp#L45

eboasson commented 3 years ago

Also, is it effective to use loaned normal messages

I haven't yet implemented the loaned messages interface in the Cyclone RMW layer. Cyclone itself supports it, but the glue code for ROS 2 hasn't been done yet. It still stores the ROS messages internally as a serialized messages, and to do loaned messages properly it will need to (be able to) store the data as a deserialized message to properly support loaning. That basically means a few things in the RMW layer need changing; the first is: https://github.com/ros2/rmw_cyclonedds/blob/b18a10260fa764e1deca2ff8962c75d32dcae530/rmw_cyclonedds_cpp/src/serdata.hpp#L50 where it will need a (std::atomic) pointer for storing a deserialized message; https://github.com/ros2/rmw_cyclonedds/blob/b18a10260fa764e1deca2ff8962c75d32dcae530/rmw_cyclonedds_cpp/src/serdata.cpp#L129 where it will need to deserialize immediately, which is very similar to what https://github.com/ros2/rmw_cyclonedds/blob/b18a10260fa764e1deca2ff8962c75d32dcae530/rmw_cyclonedds_cpp/src/serdata.cpp#L262 already does. That to_sample function additionally needs to check whether it has a deserialized message available, using a copy/move constructor if it it does, and deserializing it into the provided buffer if not. (One can use the move one if it's known to be the last reference, which is always requires some additional care, but I think it could be done to good effect in this case.) Certainly there are few other bits and pieces that need updating, but that would do proper loaned messages on the reading side.

And then basically the framework for doing loaned messages on the publishing side is done as well :)

What I don't know is how to do a copy/move constructor for a C++ sample using the introspection type support that this uses. It might require replacing the dynamic bits by a static type support, just like other RMWs have all moved to a static type support.

I realize this comment doesn't help you much ... but I figured it'd be better to give a bit of background rather than just saying "no" or "soon". And who knows, perhaps some kind soul with lend a hand!

or use intra process comms in cyclonedds?

It always does intra-process comms by shuffling a few pointers, it never touches the network stack for data that stays within the process.

nnmm commented 3 years ago

We at Apex.AI are also looking into this. Like you have already determined, the problem is that topic_tools works with serialized messages, which cause an extra copy per subscription. Unfortunately the type is not available at compile-time, because of how topic_tools works. I want to check if it makes sense to perform an (unneccessary) deserialization using the type's typesupport library in the relay node, so that we would be using the same rcl functions as a typed publisher/subscription, and as a result avoid the copies.

Maybe it would also be worth going directly to the DDS layer for topic tools, like @eboasson mentioned. I have a PR open that is an updated version of the code used in the reproducer, although it still suffers from the scaling problem, where I will bring this up. Also, I'm curious whether adding support for serialized messages in ROS2's non-DDS intra process mechanism would allow to sidestep this issue in the RMW layer.

nnmm commented 3 years ago

I checked whether adding a deserialization in the publishing step would help, but it didn't. I then tried to reproduce @wep21's report of improved latency (actually throughput) when replacing the generic subscription and publisher with a PointCloud2 subscription and publisher, however, I saw the frequency of the topic remain the same (5 hz at 20 relay nodes). I am wondering if the message being a SerializedMessage isn't a red herring, maybe I misread some explanation there.

Also: The scaling issue goes away when I remove the republishing in the relay nodes – even with a large number of subscriptions the frequency remains stable. If publishing is the bottleneck, that makes me wonder why we are still seeing an equal amount of time spent in the subscription component in the flamegraph. I checked that the flamegraph looks similar even with a lot of nodes.

eboasson commented 3 years ago

Well, the only real issue is that it copies the data, and for a point cloud, that effectively amounts to: (a tiny bit of time for some header) + (a large amount of time for memcpy'ing the raw data). For a serialized message, that is done by a single memcpy, for the non-serialized one the "tiny bit" part grows by an insignificant amount.

The ROS 2 interface doesn't do loaned serialized messages, and the Cyclone RMW layer doesn't (yet) implement the loaned (normal) messages. So either way it ends up copying the data ...

To me, it would make sense if loaned serialized messages were added to ROS 2. Use cases like this would really benefit.

nnmm commented 3 years ago

In a conversation with @wep21 I suggested using a multithreaded component container, since that turned out to be a bottleneck in our experiments. @wep21 did you try it out and did it help?

@eboasson Even if it's not the biggest bottleneck here, I agree that these APIs make sense for such republishing nodes, or rosbag. I was told that to make it happen, one should open an issue in https://github.com/ros2/rmw describing what functions to add and why, or a PR with a doc describing the same (e.g. like this), and potentially update https://design.ros2.org/articles/zero_copy.html.

I would guess, from looking at https://docs.ros2.org/latest/api/rmw/rmw_8h.html, that the functions would need to be

and have very similar signatures to their non-serialized variants, but you probably have a better picture of what's needed thanks to your experience with RMW and CycloneDDS. Are you planning to move this forward by any chance?

wep21 commented 3 years ago

I tried component_container_mt but it had little effect on performance when running the whole launch and had high cpu usage(I do not know how to change the number of threads).

dejanpan commented 3 years ago

@wep21 sorry for the late reply - just passing our internal feedback until @nnmm is up:

little effect on performance when running the whole launch and had high cpu usage

Could you tell us what is the whole launch file?

At this point we'd also need the following:

  1. Exact reproducer with which you have performance issues
  2. Platform you are running on (ECU, (RT)OS)
  3. Resources of your ECU when you are running the re-producer (CPU and memory consumption)
  4. SW versions for Cyclone DDS, ROS 2, rmw_cyclone_dds

Once we have this info, we can reproduce this on our side and come to the bottom of the performance bottle-neck. The solution probably might require some changes in the architecture on how you do things.


When we used component_container_mt we didn't see any performance degradation with the launch file here https://github.com/wep21/test_pointcloud_launch/blob/main/launch/test_pointcloud.launch.py#L59-L69.

Our whole experiment:

The node graph for the setup is as below:

node_t4

However, from the launch file here https://github.com/wep21/test_pointcloud_launch/blob/main/launch/test_pointcloud.launch.py#L59-L69 I see that everything happens in one process. Below is the process:

sumanth+ 10989 11.7  0.1 866152 39948 pts/2    Sl+  19:36   0:00 /usr/bin/python3 /opt/ros/foxy/bin/ros2 launch test_pointcloud_launch test_pointcloud.launch.py
sumanth+ 11000 91.5  0.2 623784 66948 pts/2    Rl+  19:36   0:03 /opt/ros/foxy/lib/rclcpp_components/component_container --ros-args -r __node:=test_pointcloud_container -r __ns:=/pointcloud_preprocessor

The above configuration with 20 relay nodes reduces the publishing rate by half!

Now if I decouple the Relay nodes and the PCD publisher node into individual processes, then the publishing frequency of /output is not affected. I have changed the launch file as below to decouple the nodes into 2 different processes.

--- a/launch/test_pointcloud.launch.py
+++ b/launch/test_pointcloud.launch.py
@@ -56,16 +56,28 @@ def generate_launch_description():
             }],
         )

-    container = ComposableNodeContainer(
-        name='test_pointcloud_container',
-        namespace='pointcloud_preprocessor',
+    relay_container = ComposableNodeContainer(
+        name='relay_container',
+        namespace='',
+        package='rclcpp_components',
+        executable='component_container',
+        composable_node_descriptions=[
+            *list(map(create_relay_component, list(range(0, 30))))
+        ],
+        output='screen',
+    )
+    pcd_container = ComposableNodeContainer(
+        name='pcd_container',
+        namespace='',
         package='rclcpp_components',
         executable='component_container',
         composable_node_descriptions=[
             publish_component,
-            *list(map(create_relay_component, list(range(0, 10))))
         ],
         output='screen',
     )

-    return launch.LaunchDescription([container])
+    launch_description = launch.LaunchDescription()
+    launch_description.add_action(pcd_container)
+    launch_description.add_action(relay_container)
+    return launch_description

Below are the 2 processes:

sumanth+ 19514  1.8  0.1 1013660 40504 pts/2   Sl+  20:20   0:00 /usr/bin/python3 /opt/ros/foxy/bin/ros2 launch test_pointcloud_launch test_pointcloud.launch_decoupled.py
sumanth+ 19526  6.9  0.1 596192 47672 pts/2    Sl+  20:20   0:01 /opt/ros/foxy/lib/rclcpp_components/component_container --ros-args -r __node:=pcd_container -r __ns:=/
sumanth+ 19528  140  5.0 2201980 1646764 pts/2 Rl+  20:20   0:39 /opt/ros/foxy/lib/rclcpp_components/component_container --ros-args -r __node:=relay_container -r __ns:=/

This doesn't affect the publishing frequency of the topic /output, but reduces the publishing frequency of /output* (relayed topics). This probably means that the problem is somewhere in the subscriber callback with re-publishing the received point cloud.

The launch file uses component_container here https://github.com/wep21/test_pointcloud_launch/blob/main/launch/test_pointcloud.launch.py#L63`, which uses a single-threaded executor. The reasonable change now is to update it to a multi-threaded executor as below:

--- a/launch/test_pointcloud.launch.py
+++ b/launch/test_pointcloud.launch.py
@@ -60,10 +60,10 @@ def generate_launch_description():
         name='test_pointcloud_container',
         namespace='pointcloud_preprocessor',
         package='rclcpp_components',
-        executable='component_container',
+        executable='component_container_mt',
         composable_node_descriptions=[
             publish_component,
-            *list(map(create_relay_component, list(range(0, 10))))
+            *list(map(create_relay_component, list(range(0, 30))))
         ],
         output='screen',
     )

Then the publishing frequency of both /output, /output* seems to be not affected.


The above probably means, having more than 10 relay nodes in one thread is causing the bottleneck (for the size of the point cloud messages which PCD publisher is publishing). The flow is we receive a message (from the PCD publisher), rclcpp executor triggers the subscription callback, in the subscription callback we publish the received message. Publishing the message is the only one that takes time in the callback. As the number of relay nodes increases, the number of the subscriber's increases (which increases the callbacks), since single threaded executor runs in the same thread in which executor.spin() is called, all this work for all the subscribers should happen in the same thread, which I think is causing the congestion. \ The multi threaded executor creates threads and dispatches the work to these threads, because of which I think we don't have the congestion and the publishing frequency seems to be not affected by the number of relay nodes.


I agree that typing the pub/sub, copying are not the real issues, but I think the congestion is because of the above reasoning. However, the publish (write in DDS) for these kinds of large messages might be inefficient in Cyclone DDS and zero-copy transport could be perfect for these use cases.