dora-rs / dora

DORA (Dataflow-Oriented Robotic Architecture) is middleware designed to streamline and simplify the creation of AI-based robotic applications. It offers low latency, composable, and distributed dataflow capabilities. Applications are modeled as directed graphs, also referred to as pipelines.
https://dora-rs.ai
Apache License 2.0
1.49k stars 76 forks source link

Do we need a similar mechanism like the Qos in ROS2 #166

Open liuzf1988 opened 1 year ago

liuzf1988 commented 1 year ago

This is a problem encountered in porting NDT algorithm from ROS2 ecosystem to DORA framework.

A problem was found during code debugging. If a function of the DORA operator takes a long time to execute, other input messages cannot be processed in time.

For example, the serviceNDTAlign() function in ndt_scan_matcher operator, see the settings in dataflow.yml. Since Particle filtering is required to find a more accurate initial position, the program takes a long time to execute when receiving the align_request_pose input (In fact, it takes about 3~5s to execute this program in ROS2, but it takes tens of seconds in DORA. I am looking for the reason why the execution takes so long, which will be discussed later). Then other input messages, such as downsampled_pointcloud and initial_pose, will not be processed in time and will accumulate in memory. In fact, messages such as downsampled_pointcloud and initial_pose are time sensitive. After a certain period of time, these messages are meaningless, so there is no need to cache all messages. In ROS2, one can specify how many messages can be cached at most by setting Qos, as follows:

  initial_pose_sub_ = this->create_subscription<geometry_msgs::msg::PoseWithCovarianceStamped>(
    "ekf_pose_with_covariance", 100,
    std::bind(&NDTScanMatcher::callbackInitialPose, this, std::placeholders::_1),
    initial_pose_sub_opt);

  sensor_points_sub_ = this->create_subscription<sensor_msgs::msg::PointCloud2>(
    "points_raw", rclcpp::SensorDataQoS().keep_last(points_queue_size),
    std::bind(&NDTScanMatcher::callbackSensorPoints, this, std::placeholders::_1), main_sub_opt);

Is there a similar mechanism in Dora, or do we also need such a mechanism?

phil-opp commented 1 year ago

We don't have a way to limit the queue length yet, but we agree that such a feature is definitely needed. The question is whether we should signal to the receiver and/or the sender when we need to drop a message, or whether we should drop it without notice. How does ROS2 handle this situation?

liuzf1988 commented 1 year ago

The QoS setting in ROS2 is a little complicated. In brief, QoS profiles can be specified for publishers, subscriptions, service servers and clients. A QoS profile can be applied independently to each instance of the aforementioned entities, but if different profiles are used, it is possible that they will be incompatible, preventing the delivery of messages. Usually, the sender and receiver adopt the QoS policy of each topic/message independently. The receiver can decide how many messages to keep in the cache by setting the history and depth fields of QoS profile. Thus there is no need for the receiver to notice the sender when it drop a message. Take the following as an example.

QoS setting in the sender

Ouster lidar driver set the sensor data Qos through a config file.

// ros2_ouster/params/driver_config.yaml
ouster_driver:
  ros__parameters:
    # if False, data are published with sensor data QoS. This is preferrable
    # for production but default QoS is needed for rosbag.
    use_system_default_qos: False

// ros2_ouster/src/ouster_driver.cpp
void OusterDriver::onConfigure()
{
  if (_use_system_default_qos) {
    RCLCPP_INFO(
      this->get_logger(), "Using system defaults QoS for sensor data");
    _data_processors = ros2_ouster::createProcessors(
      shared_from_this(), _sensor->getMetadata(), _imu_data_frame, _laser_data_frame,
      rclcpp::SystemDefaultsQoS(),
      _sensor->getPacketFormat(), _full_rotation_accumulator, _proc_mask);
  } else {
    _data_processors = ros2_ouster::createProcessors(
      shared_from_this(), _sensor->getMetadata(), _imu_data_frame, _laser_data_frame,
      rclcpp::SensorDataQoS(), _sensor->getPacketFormat(), _full_rotation_accumulator, _proc_mask);
  }
}

From above code, we can see that if _use_system_default_qos = true, the rclcpp::SystemDefaultsQoS() policy is adopted; otherwise, the rclcpp::SensorDataQoS() policy is used. The details about the two policies can be found in the rclcpp/qos.hpp file.

// include/rclcpp/qos.hpp

/**
 * Sensor Data QoS class
 *    - History: Keep last,
 *    - Depth: 5,
 *    - Reliability: Best effort,
 *    - Durability: Volatile,
 *    - Deadline: Default,
 *    - Lifespan: Default,
 *    - Liveliness: System default,
 *    - Liveliness lease duration: default,
 *    - avoid ros namespace conventions: false
 */
class RCLCPP_PUBLIC SensorDataQoS : public QoS
{
public:
  explicit
  SensorDataQoS(
    const QoSInitialization & qos_initialization = (
      QoSInitialization::from_rmw(rmw_qos_profile_sensor_data)
  ));
};

/**
 * System defaults QoS class
 *    - History: System default,
 *    - Depth: System default,
 *    - Reliability: System default,
 *    - Durability: System default,
 *    - Deadline: Default,
 *    - Lifespan: Default,
 *    - Liveliness: System default,
 *    - Liveliness lease duration: System default,
 *    - Avoid ros namespace conventions: false
 */
class RCLCPP_PUBLIC SystemDefaultsQoS : public QoS
{
public:
  explicit
  SystemDefaultsQoS(
    const QoSInitialization & qos_initialization = (
      QoSInitialization::from_rmw(rmw_qos_profile_system_default)
  ));
};

From above code and the code in rmw/qos_profiles.h, we know the implementation of QoS policy relies on the DDS support.

QoS setting in the receiver The pointcloud_preporcessor module in Autoware is responsible for subscribing the pointcloud topic and preprocessing it. The QoS policy at the receiver is set when create the subscription, seeing the code in the filter.cpp file.

// autoware.universe/sensing/pointcloud_preprocessor/src/filter.cpp
void pointcloud_preprocessor::Filter::subscribe()
{
    if (use_indices_) {
        // ...
    } else {
        // ...
        sub_input_ = create_subscription<PointCloud2>("input", rclcpp::SensorDataQoS().keep_last(max_queue_size_), cb);
    }
}

From above code, we know the receiver QoS policy is rclcpp::SensorDataQoS(). To make sure the sender and receiver can communicate normally, we can adopt the same QoS policy by setting system_default_qos: false at the sender to make the two QoS policies are compatible.

Hope the above information can be helpful.

phil-opp commented 1 year ago

Thanks a lot for the explanation, this is very useful! I think the main priority for dora is the history/depth/queue length setting. We can look into the other QoS options later.

A QoS profile can be applied independently to each instance of the aforementioned entities, but if different profiles are used, it is possible that they will be incompatible, preventing the delivery of messages.

Interesting! What happens if the publisher and subscriber specifiy different depth values? The QoS compatibilities section only mentions the other settings..