eProsima / Fast-DDS

The most complete DDS - Proven: Plenty of success cases. Looking for commercial support? Contact info@eprosima.com
https://eprosima.com
Apache License 2.0
2.14k stars 763 forks source link

[21617] Inconsistent network throughput with LARGE_DATA #5158

Open mode89 opened 1 month ago

mode89 commented 1 month ago

Is there an already existing issue for this?

Expected behavior

I expect amount of network traffic generated by FastDDS in LARGE_DATA mode to be almost the same as the payload (ROS message size multiplied by publishing rate).

Current behavior

Occasionally, generated network traffic exceeds estimated payload throughput.

There no consistency in how often it happens: on some days it could happen on half of the runs, on other days - maybe one out of ten runs.

Surprisingly, whenever network traffic exceeds the estimated value, it is always almost exactly 3 times higher.

Steps to reproduce

Set up two computers connected via Ethernet. Run publisher on one computer and subscriber on another computer. Publish large messages. Observe network throughput.

Fast DDS version/commit

I use ROS Jazzy, which currently bundles Fast DDS v2.14.1. I also tried ROS Humble with Fast DDS v2.14.3 - same behavior.

Platform/Architecture

Other. Please specify in Additional context section.

Transport layer

TCPv4

Additional context

In my experiments, the publisher is an embedded ARM64 platform:

Subscriber runs on a desktop PC:

Computers connected through 1 Gbps switch.

Both computers build ROS2 (with FastDDS) from source code through Nix package manager. I didn't apply any extra patches to FastDDS.

On publisher side, net.core.wmem_max = 200000000. On subscriber side: net.core.rmem_max = 200000000.

I emulated two kinds of payload:

XML configuration file

<?xml version="1.0" encoding="UTF-8" ?>
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
    <profiles>
        <participant
            profile_name="large_data_builtin_transports_options"
            is_default_profile="true">
            <rtps>
                <builtinTransports
                    max_msg_size="10MB"
                    sockets_size="200MB"
                    non_blocking="true">LARGE_DATA</builtinTransports>
            </rtps>
        </participant>
    </profiles>
</dds>

Relevant log output

No response

Network traffic capture

No response

mode89 commented 1 month ago

Point cloud publisher:

#!/usr/bin/env python3

from array import array
from random import randint
from threading import Thread

import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header

POINT_NUM = 180000
FIELDS = [
    PointField(name="x", offset=0, datatype=PointField.FLOAT32, count=1),
    PointField(name="y", offset=4, datatype=PointField.FLOAT32, count=1),
    PointField(name="z", offset=8, datatype=PointField.FLOAT32, count=1),
    PointField(name="intensity", offset=12, datatype=PointField.FLOAT32, count=1),
]
POINT_SIZE = 16

def main():
    rclpy.init()
    node = Node("points")
    qos = rclpy.qos.QoSProfile(
        depth=10,
        reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
        # reliability=rclpy.qos.QoSReliabilityPolicy.BEST_EFFORT,
    )
    publisher = node.create_publisher(PointCloud2, "points", qos)
    rate = node.create_rate(10)
    counter = 0

    # Numpy to array conversion is slow. Published points selected
    # by random offset from the beginning of this array.
    points = array("B",
        np.random \
            .rand(POINT_NUM * 100, len(FIELDS))
            .astype(np.float32)
            .tobytes())

    spinner = Thread(target=rclpy.spin, args=(node,), daemon=True)
    spinner.start()

    while rclpy.ok():
        print(counter)
        counter += 1
        msg = PointCloud2()
        msg.height = 1
        msg.width = POINT_NUM
        msg.fields = FIELDS
        msg.point_step = POINT_SIZE
        msg.row_step = POINT_NUM * POINT_SIZE
        msg.is_dense = True
        msg.is_bigendian = False
        offset = randint(0, POINT_NUM * 99) * POINT_SIZE
        msg.data = points[offset:offset + POINT_NUM * POINT_SIZE]
        timestamp = node.get_clock().now().to_msg()
        msg.header = Header()
        msg.header.stamp = timestamp
        msg.header.frame_id = "map"
        publisher.publish(msg)
        rate.sleep()

if __name__ == "__main__":
    main()
mode89 commented 1 month ago

Image publisher:

#!/usr/bin/env python3

from array import array
from random import randint
from threading import Thread

import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import Header

WIDTH = 200
HEIGHT = 200
PIXEL_SIZE = 3
TOPIC_NUM = 4

def main():
    rclpy.init()
    node = Node("images")

    qos = rclpy.qos.QoSProfile(
        depth=10,
        reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
        # reliability=rclpy.qos.QoSReliabilityPolicy.BEST_EFFORT,
    )

    image_size = WIDTH * HEIGHT * PIXEL_SIZE
    pixels = array("B",
        (np.random.rand(image_size * 2) * 256)
        .astype(np.uint8)
        .tobytes())

    def publisher_thread(topic):
        publisher = node.create_publisher(Image, topic, qos)
        clock = node.get_clock()
        rate = node.create_rate(25)
        counter = 0

        while rclpy.ok():
            print(topic, counter)
            counter += 1

            msg = Image()
            msg.width = WIDTH
            msg.height = HEIGHT
            msg.encoding = "rgb8"
            msg.step = WIDTH * PIXEL_SIZE
            msg.is_bigendian = False
            offset = randint(0, image_size)
            msg.data = pixels[offset:offset + image_size]

            msg.header = Header()
            msg.header.frame_id = "map"

            timestamp = clock.now().to_msg()
            msg.header.stamp = timestamp
            publisher.publish(msg)

            rate.sleep()

    for i in range(1, TOPIC_NUM + 1):
        topic = f"image{i}"
        thread = Thread(
            target=publisher_thread,
            args=(topic,),
            daemon=True)
        thread.start()

    rclpy.spin(node)

if __name__ == "__main__":
    main()
mode89 commented 1 month ago

Subscriber:

#!/usr/bin/env python3

import sys
from time import time

import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.time import Time
from sensor_msgs.msg import PointCloud2, Image

USE_SIM_TIME = False

def main():
    rclpy.init()
    node_id = int(time())
    node = Node(f"subscriber_{node_id}")
    node.set_parameters([
        Parameter("use_sim_time", Parameter.Type.BOOL, USE_SIM_TIME)
    ])
    clock = node.get_clock()
    timestamps = []
    delays = []
    qos = rclpy.qos.QoSProfile(
        depth=10,
        reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
        # reliability=rclpy.qos.QoSReliabilityPolicy.BEST_EFFORT,
    )
    counter = [0]
    _callback = lambda msg: callback(msg, clock, delays, timestamps, counter)
    msg_type = globals()[sys.argv[1]]
    topic = sys.argv[2]
    subscriber = node.create_subscription(msg_type, topic, _callback, qos)
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

def callback(msg, clock, delays, timestamps, counter):
    counter[0] += 1
    print(counter[0])

    delta = clock.now() - Time.from_msg(msg.header.stamp)
    delta = delta.nanoseconds / 1e9
    delays.append(delta)
    mean_delay = sum(delays) / len(delays)
    max_delay = max(delays)
    if len(delays) == 20:
        delays.pop(0)
    print(f"mean: {mean_delay:.3f} s, max: {max_delay:.3f} s")

    timestamp = Time.from_msg(msg.header.stamp).nanoseconds
    timestamps.append(timestamp)
    if len(timestamps) == 20:
        timestamps.pop(0)
    if len(timestamps) > 1:
        delta = (timestamps[-1] - timestamps[0]) * 1e-9
        msg_num = len(timestamps) - 1
        rate = msg_num / delta
        print(f"rate: {rate:.3f} Hz")

if __name__ == "__main__":
    main()
mode89 commented 1 month ago

Such behavior observed even with BEST_EFFORT policy.

mode89 commented 1 month ago

Tried publishing from another desktop PC, connected directly through 1 Gbps. Same behavior.

mode89 commented 1 month ago

Increasing publisher's txqueuelen didn't help.

JesusPoderoso commented 5 days ago

Hi @mode89, thanks for your report, The eProsima Team will review and reproduce the issue in the following days, and come back with some feedback. Bear in mind that neither RHL nor NixOS are part of the Fast DDS supported platforms, we will try to reproduce using Ubuntu.

mode89 commented 4 days ago

Thank you! :)

Today tested environment based on ros:jazzy docker container, on two x86 computers. Same excessive traffic. Published point cloud with estimated throughput of ~28 MB/s. This time the observed traffic was fluctuating between ~30-50 MB/s.