dora-rs / dora

DORA (Dataflow-Oriented Robotic Architecture) is middleware designed to streamline and simplify the creation of AI-based robotic applications. It offers low latency, composable, and distributed dataflow capabilities. Applications are modeled as directed graphs, also referred to as pipelines.
https://dora-rs.ai
Apache License 2.0
1.46k stars 76 forks source link

Multithreading mechanism #164

Open liuzf1988 opened 1 year ago

liuzf1988 commented 1 year ago

This is a problem encountered in porting NDT algorithm from ROS2 ecosystem to DORA framework

In ROS2, the ndt_scan_matcher package adopts the multithreading mechanism: from its code, it can be seen that, the two callback functions ( callbackMapPoints and callbackSensorPoints) and the serviceNDTAlign service share a thread, which are directly related to the setting and execution of the NDT algorithm, while CallbackInitialPose is a separate thread. Extract the relevant codes as follows:

// ndt_scan_matcher_core.cpp
NDTScanMatcher::NDTScanMatcher()
{
    //...
    rclcpp::CallbackGroup::SharedPtr initial_pose_callback_group;
    initial_pose_callback_group =
        this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

    rclcpp::CallbackGroup::SharedPtr main_callback_group;
    main_callback_group = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

    auto initial_pose_sub_opt = rclcpp::SubscriptionOptions();
    initial_pose_sub_opt.callback_group = initial_pose_callback_group;

    auto main_sub_opt = rclcpp::SubscriptionOptions();
    main_sub_opt.callback_group = main_callback_group;

    initial_pose_sub_ = this->create_subscription<geometry_msgs::msg::PoseWithCovarianceStamped>(
        "ekf_pose_with_covariance", 100,
        std::bind(&NDTScanMatcher::callbackInitialPose, this, std::placeholders::_1),
        initial_pose_sub_opt);
    map_points_sub_ = this->create_subscription<sensor_msgs::msg::PointCloud2>(
        "pointcloud_map", rclcpp::QoS{1}.transient_local(),
        std::bind(&NDTScanMatcher::callbackMapPoints, this, std::placeholders::_1), main_sub_opt);
    sensor_points_sub_ = this->create_subscription<sensor_msgs::msg::PointCloud2>(
        "points_raw", rclcpp::SensorDataQoS().keep_last(points_queue_size),
        std::bind(&NDTScanMatcher::callbackSensorPoints, this, std::placeholders::_1), main_sub_opt);

    service_ = this->create_service<tier4_localization_msgs::srv::PoseWithCovarianceStamped>(
        "ndt_align_srv",
        std::bind(&NDTScanMatcher::serviceNDTAlign, this, std::placeholders::_1, std::placeholders::_2),
        rclcpp::ServicesQoS().get_rmw_qos_profile(), main_callback_group);
}

// ndt_scan_matcher_node.cpp
int main(int argc, char ** argv)
{
  rclcpp::init(argc, argv);
  auto ndt_scan_matcher = std::make_shared<NDTScanMatcher>();
  rclcpp::executors::MultiThreadedExecutor exec;
  exec.add_node(ndt_scan_matcher);
  exec.spin();
  rclcpp::shutdown();
  return 0;
}

During the migration of NDT algorithm to DORA framework. The on_input function is temporarily implemented as follows:

// ndt_scan_matcher.cpp
OnInputResult on_input(NDTScanMatcher &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender)
{
    if (id == "initial_pose") {
        // Corresponding to the original callbackInitialPose callback function
    } else if (id == "tf_baselink2lidar") {
        // ...
    }
    if (id == "map_points") {
        // Corresponding to the original callbackMapPoints callback function
    } else if (id == "align_request_pose") {
        // Corresponding to the original serviceNDTAlign service
    } else if (id == "downsampled_pointcloud") {
        // Corresponding to the original callbackSensorPoints callback function
    }
}

How to implement multithreading mechanism in DORA operator?

phil-opp commented 1 year ago

We don't have plans to implement such a multithreading mechanism in dora right now. The main advantage of the current single-threaded design is that the on_input function has exclusive access to the operator's fields (when using the Rust or C++ APIs) or the custom operator_context (when using the C API).

If you want to handle inputs in different threads, you can spawn the handler threads manually and e.g. use a go-style channel to forward the relevant inputs. I'm not familiar with the C++ ecosystem, but the ahorn/cpp-channel seems to implement something like this. Your on_input method would then just match on the id and forward the input to the respective handler thread. Instead of OS-level threads, you should also be able to use C++20 coroutines or some other type of green threads.