ros2 / rmw_fastrtps

Implementation of the ROS Middleware (rmw) Interface using eProsima's Fast RTPS.
Apache License 2.0
157 stars 117 forks source link

Exception was thrown while recording into bag high frequency exchange of components #463

Open rrrand opened 4 years ago

rrrand commented 4 years ago

Bug report

Required Info:

Steps to reproduce issue

My purpose was to test bag recording reliability with components exchange. I modified components from https://github.com/DensoADAS/high_freq_pub_example/tree/master: reliable keep_all quality of service was set and new launch file to start components and bag was created.

Launch file:

import os
import launch
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import ComposableNodeContainer
from launch_ros.actions import Node
from launch_ros.descriptions import ComposableNode

def generate_launch_description():
    bag = launch.actions.ExecuteProcess(
        cmd=['ros2', 'bag', 'record', 'chatter', '--qos-profile-overrides-path', 'chatter_qos.yaml'],
        output='screen'
    )
    container = ComposableNodeContainer(
        name='my_container',
        namespace='',
        package='rclcpp_components',
        executable='component_container',
        composable_node_descriptions=[
            ComposableNode(
                package='high_freq_pub',
                plugin='high_freq_pub::Listener',
                name='listener'),
            ComposableNode(
                package='high_freq_pub',
                plugin='high_freq_pub::Talker',
                name='talker'),
        ],
        output='screen',
    )
    return launch.LaunchDescription([ bag, container ])

chatter_qos.yaml file for bag:

/chatter:
    history: keep_all
    reliability: reliable
    durability: volatile

Modifed listener.cpp:

#include "rclcpp/rclcpp.hpp"
#include "rclcpp_components/register_node_macro.hpp"
#include "std_msgs/msg/int32.hpp"
#include "high_freq_pub/visibility_control.h"
#include <cmath>

using namespace std::chrono_literals;

namespace high_freq_pub
{
class Listener : public rclcpp::Node
{
public:
    HIGH_FREQ_PUB_PUBLIC
    explicit Listener(const rclcpp::NodeOptions& options)
        : Node("listener_high_freq", options),
          m_messageID(0),
          m_numReceivedMsgs(0)
    {
        setvbuf(stdout, NULL, _IONBF, BUFSIZ);
        auto callback =
            [this](const std_msgs::msg::Int32::SharedPtr msg) -> void
            {
                ++m_numReceivedMsgs;
                int32_t msgDiff = msg->data - m_messageID;
                m_messageID = msg->data;
                if (msgDiff >= 2)
                {
                     RCLCPP_ERROR(this->get_logger(), "Lost %d msgs, curr: %d", msgDiff - 1, msg->data);
                }
                else
                {
                     RCLCPP_INFO(this->get_logger(), "Received msg %d", msg->data);
                }
            };

        rclcpp::QoS qos = rclcpp::QoS(rclcpp::KeepAll()).reliable();
        sub_ = create_subscription<std_msgs::msg::Int32>("chatter", qos, callback);
        RCLCPP_INFO(this->get_logger(), "Ready to receive messages");
    }

    ~Listener()
    {
        RCLCPP_INFO(this->get_logger(), "Received %d messages", m_numReceivedMsgs);
        RCLCPP_INFO(this->get_logger(), "Lost %d messages", 2000000 - m_numReceivedMsgs);
    }

private:
    rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
    int32_t m_messageID;
    int32_t m_numReceivedMsgs;
};

}  // namespace high_freq_pub

RCLCPP_COMPONENTS_REGISTER_NODE(high_freq_pub::Listener)

Modified talker.cpp:

#include <chrono>
#include <cstdio>
#include <memory>
#include <utility>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp_components/register_node_macro.hpp"

#include "std_msgs/msg/int32.hpp"

#include "high_freq_pub/visibility_control.h"

using namespace std::chrono_literals;

namespace high_freq_pub
{
class Talker : public rclcpp::Node
{
public:
    HIGH_FREQ_PUB_PUBLIC
    explicit Talker(const rclcpp::NodeOptions& options)
        : Node("talker_high_freq", options)
    {
        setvbuf(stdout, NULL, _IONBF, BUFSIZ);
        auto publish_message =
            [this]() -> void
            {
                auto msg = std::make_unique<std_msgs::msg::Int32>();
                msg->data = count_++;
                 RCLCPP_INFO(this->get_logger(), "Publishing message #%d", msg->data);
                pub_->publish(std::move(msg));

                if (count_ > 2000000)
                {
                    timer_->cancel();
                    auto end_time = this->now();
                    RCLCPP_INFO(this->get_logger(), "Sent 2000000 messages in %f s" , (end_time - this->start_time_).seconds());
                }
            };

        rclcpp::QoS qos = rclcpp::QoS(rclcpp::KeepAll()).reliable();
        pub_ = this->create_publisher<std_msgs::msg::Int32>("chatter", qos);

        std::this_thread::sleep_for(std::chrono::seconds(5));
        start_time_ = this->now();
        RCLCPP_INFO(this->get_logger(), "Sending messages...");
        timer_ = this->create_wall_timer(10us, publish_message);
    }

private:
    int32_t count_ = 1;
    rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
    rclcpp::TimerBase::SharedPtr timer_;
    rclcpp::Time start_time_;
};

}  // namespace high_freq_pub

RCLCPP_COMPONENTS_REGISTER_NODE(high_freq_pub::Talker)

Expected behavior

No exceptions.

Actual behavior

Exception was thrown:

[component_container-2] [INFO] [1600847014.427153689] [listener]: Received msg 361200
[component_container-2] [INFO] [1600847014.427181735] [talker]: Publishing message #361261
[component_container-2] [INFO] [1600847014.427499091] [listener]: Received msg 361201
[component_container-2] [INFO] [1600847014.427536384] [talker]: Publishing message #361262
[component_container-2] [INFO] [1600847014.427849888] [listener]: Received msg 361202
[component_container-2] [INFO] [1600847014.427881380] [talker]: Publishing message #361263
[component_container-2] [INFO] [1600847014.428196148] [listener]: Received msg 361203
[component_container-2] [INFO] [1600847014.428229948] [talker]: Publishing message #361264
[component_container-2] [INFO] [1600847014.428542274] [listener]: Received msg 361204
[component_container-2] [INFO] [1600847014.428576581] [talker]: Publishing message #361265
[component_container-2] [INFO] [1600847014.428900996] [listener]: Received msg 361205
[component_container-2] [INFO] [1600847014.428934479] [talker]: Publishing message #361266
[component_container-2] [INFO] [1600847014.429248517] [listener]: Received msg 361206
[component_container-2] [INFO] [1600847014.429282848] [talker]: Publishing message #361267
[component_container-2] [INFO] [1600847014.429596174] [listener]: Received msg 361207
[component_container-2] [INFO] [1600847014.429632806] [talker]: Publishing message #361268
[component_container-2] [INFO] [1600847014.429940879] [listener]: Received msg 361208
[component_container-2] [INFO] [1600847014.429974371] [talker]: Publishing message #361269
[component_container-2] [INFO] [1600847014.430292143] [listener]: Received msg 361209
[component_container-2] [INFO] [1600847014.430325108] [talker]: Publishing message #361270
[component_container-2] [INFO] [1600847014.430637105] [listener]: Received msg 361210
[component_container-2] [INFO] [1600847014.430669550] [talker]: Publishing message #361271
[component_container-2] terminate called after throwing an instance of 'rclcpp::exceptions::RCLError'
[component_container-2]   what():  failed to publish message: cannot publish data, at /tmp/binarydeb/ros-foxy-rmw-fastrtps-shared-cpp-1.2.0/src/rmw_publish.cpp:53, at /tmp/binarydeb/ros-foxy-rcl-1.1.7/src/rcl/publisher.c:291

Additional information

I would like to share my statistics. I measured reliable keep_all exchange time of talker and listener to transmit 2_000_000 messages with int32. All messages were received by listener and were saved by bag. I have got the following durations:

So recording messages in bag is a long running operation.

clalancette commented 4 years ago

Yeah, there are some known issues about recording a lot of data with rosbag2. I'm going to move this ticket over to the rosbag2 repository, where I believe some work is being done to address this.

dirk-thomas commented 4 years ago

The very same error happens when running a C++ publisher with a very high frequency (e.g. in the performance tests). So I think this should stay in the rmw_fastrtps repo since the problem doesn't exist with other RMW implementations and has nothing to do with rosbag.

clalancette commented 4 years ago

The very same error happens when running a C++ publisher with a very high frequency (e.g. in the performance tests). So I think this should stay in the rmw_fastrtps repo since the problem doesn't exist with other RMW implementations and has nothing to do with rosbag.

Well, I actually think there are 2 issues here.

One issue is that recording a bag is a synchronous process; it collects a bunch of data, then blocks while writing it to disk. In the meantime, data that is being published is probably filling up the internal Fast-DDS queues, since the receiver isn't receiving any of them.

The second issue is what you describe, in that we rmw_fastrtps probably shouldn't crash when that situation happens.

dirk-thomas commented 4 years ago

The reported error message described in this ticket is:

[component_container-2] terminate called after throwing an instance of 'rclcpp::exceptions::RCLError'
[component_container-2]   what():  failed to publish message: cannot publish data, at /tmp/binarydeb/ros-foxy-rmw-fastrtps-shared-cpp-1.2.0/src/rmw_publish.cpp:53, at /tmp/binarydeb/ros-foxy-rcl-1.1.7/src/rcl/publisher.c:291

Which is clearly happening on the publisher side in rmw_fastrtps and is not related to rosbag.

Anyway please either move this ticket back or create a new ticket in rmw_fastrtps.

fujitatomoya commented 4 years ago

could be related to https://github.com/ros2/rmw_fastrtps/issues/338.

emersonknapp commented 4 years ago

We think this should be transferred back to rmw_fastrtps based on the discussion here

dirk-thomas commented 4 years ago

@richiprosima Do you have any insight why rmw_publish() fails to publish the data?

clalancette commented 4 years ago

@richiprosima @MiguelCompany Any thoughts on what might be going on here?

MiguelCompany commented 4 years ago

This is almost always a timeout when using keep_all QoS. We could change the returned error to RMW_RET_TIMEOUT, but the rcl would also throw an exception in that case.

Another option is to change the default value of max_blocking_time to infinity, as other rmw implementations are doing, but that would mean the call to publish would block forever until there is room for the sample to be added to the history.

Yet another option is to set the default value of max_samples to infinity, so there will always be room for new samples, but memory consumption would grow indefinitely until there is not enough memory, in which case we would get to the same situation (publish will return false, and RMW_RET_ERROR would be returned, which will throw the unhandled exception)