ros2 / ros1_bridge

ROS 2 package that provides bidirectional communication between ROS 1 and ROS 2
Apache License 2.0
421 stars 273 forks source link

ROS1 Bridge doesn't work with Multithreaded Executor on a ROS2 service #314

Open joelbudu opened 3 years ago

joelbudu commented 3 years ago

Bug report

Required Info:

Steps to reproduce issue

  1. Create a service with a Multithreaded executor in ROS2
  2. Connect to ROS1 via the ros1_bridge
  3. Call the ROS2 service through the bridge. Only a single thread runs on the service

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <thread>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "sensor_msgs/msg/laser_scan.hpp"
#include "geometry_msgs/msg/vector3_stamped.hpp"
#include "std_srvs/srv/empty.hpp"

using namespace std::chrono_literals;

/**
 * A small convenience function for converting a thread ID to a string
 **/
std::string string_thread_id()
{
  auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
  return std::to_string(hashed);
}

class PublisherNode : public rclcpp::Node
{
public:
  PublisherNode()
  : Node("PublisherNode")
  {
    publisher_ = this->create_publisher<geometry_msgs::msg::Vector3Stamped>("topic", 10);
    auto timer_callback =
      [this]() -> void {
        auto message = geometry_msgs::msg::Vector3Stamped();
        message.header.stamp = this->now();

        this->publisher_->publish(message);
      };
    timer_ = this->create_wall_timer(500ms, timer_callback);
  }

private:
  rclcpp::TimerBase::SharedPtr timer_;
  rclcpp::Publisher<geometry_msgs::msg::Vector3Stamped>::SharedPtr publisher_;
};

class DualThreadedNode : public rclcpp::Node
{
public:
  DualThreadedNode()
  : Node("DualThreadedNode")
  {
    /* These define the callback groups
     * They don't really do much on their own, but they have to exist in order to
     * assign callbacks to them. They're also what the executor looks for when trying to run multiple threads
     */
    callback_group_subscriber1_ = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);
    callback_group_subscriber2_ = this->create_callback_group(
      rclcpp::CallbackGroupType::MutuallyExclusive);

     auto service = [this](const std::shared_ptr<rmw_request_id_t> request_header,
      const std::shared_ptr<std_srvs::srv::Empty::Request> req,
      const std::shared_ptr<std_srvs::srv::Empty::Response> res) -> void {
      this->test_service(request_header, req, res);
    };

    callback_group4_ = this->create_callback_group(
    rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);
    test_srv_ = this->create_service<std_srvs::srv::Empty>("test_service", service, rmw_qos_profile_default, callback_group4_);

    auto sub1_opt = rclcpp::SubscriptionOptions();
    sub1_opt.callback_group = callback_group_subscriber1_;

    subscription1_ = this->create_subscription<geometry_msgs::msg::Vector3Stamped>(
      "topic",
      rclcpp::QoS(10),

      std::bind(
        &DualThreadedNode::subscriber1_cb, 
        this,                             
        std::placeholders::_1),           

      sub1_opt);                 
  }

private:

  /**
   * Every time the Publisher publishes something, all subscribers to the topic get poked
   * This function gets called when Subscriber1 is poked (due to the std::bind we used when defining it)
   */
  void subscriber1_cb(const geometry_msgs::msg::Vector3Stamped::SharedPtr msg)
  {

    vector_msg_ = msg;
  }

  void test_service(
  const std::shared_ptr<rmw_request_id_t> request_header,
  const std::shared_ptr<std_srvs::srv::Empty::Request> request,
  const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{

    RCLCPP_INFO(this->get_logger(), "Received message in callback1");

    RCLCPP_INFO(this->get_logger(), "msg timestamp : %f", rclcpp::Time(vector_msg_->header.stamp ).seconds());

    auto received_time = this->get_clock()->now();

    while (received_time +  rclcpp::Duration(5,0) > rclcpp::Time(vector_msg_->header.stamp) && rclcpp::ok())
    {  
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }

    RCLCPP_INFO(this->get_logger(), "Exitted cb after 5 secs");

}

  rclcpp::CallbackGroup::SharedPtr callback_group_subscriber1_;
  rclcpp::CallbackGroup::SharedPtr callback_group_subscriber2_;
  rclcpp::CallbackGroup::SharedPtr callback_group4_;
  rclcpp::Subscription<geometry_msgs::msg::Vector3Stamped>::SharedPtr subscription1_;
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription2_;

  rclcpp::Service<std_srvs::srv::Empty>::SharedPtr test_srv_;

  geometry_msgs::msg::Vector3Stamped::SharedPtr vector_msg_;
  rclcpp::Time stamp_;

};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);

  // You MUST use the MultiThreadedExecutor to use, well, multiple threads
  rclcpp::executors::MultiThreadedExecutor executor;
  auto pubnode = std::make_shared<PublisherNode>();
  auto subnode = std::make_shared<DualThreadedNode>();  // This contains BOTH subscriber callbacks.
                                                        // They will still run on different threads
                                                        // One Node. Two callbacks. Two Threads
  executor.add_node(pubnode);
  executor.add_node(subnode);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}
# source ros1 workspace
# source ros2 workspace
ros2 ros1_bridge dynamic_bridge --bridge-all-topics;
# source ros1 workspace
rosservice call  /test_service std_srvs/srv/Empty;

Expected behaviour

Multithreaded executor should run multiple nodes in separate threads.

Actual behavior

ROS2 service is run in a single thread

Additional information


Feature request

ros1_bridge should support nodes with the Multithreaded executor

iuhilnehc-ynos commented 3 years ago

@joelbudu

Create a service with a Multithreaded executor in ROS2 Multithreaded executor should run multiple nodes in separate threads.

Do you expect that a service created by a node running on a multithreaded executor of ros2 can deal with multiple requests from ros1 at the same time? Or two more different services created by a node or nodes? I just want to make sure you make the right code (IIRC, the flag of Callback group, exclusive, will be checked inside if you use the default callback group only in one node) in ROS2.

About ros1_bridge, there are some locations that seem to be updated if we want to support dealing with multiple requests from ros1 at the same time. such as,

  1. use CallbackGroup with Reentrant (It just bridge the data for the service, so I think we can use reentrant here.) for https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/include/ros1_bridge/factory.hpp#L354

  2. use MultipleThreadedExecutor for https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/src/dynamic_bridge.cpp#L791

BTW: could you provide an example and more detailed steps to reproduce what you expect?

iuhilnehc-ynos commented 3 years ago

Could you use foxy or rolling? Because it seems the EOL of dashing is on May, 2021.

joelbudu commented 3 years ago

@iuhilnehc-ynos Thank you for your response. My use case is a scenario where you have a long-running blocking service that depends on other threads updating a member variable before returning the response. It is a single service per node but the node will have other callbacks from topics that need to be processed concurrently.

You have also pointed very correctly to the locations I thought would need updating to support this.

I would update the Issue with more detailed steps to reproduce as well.

With respect to the ros version I'm thinking it may be easy to merge the PR in both branches (including dashing). If it's not possible I will have to create a fork and have the change in there to be able to work in my current environment

iuhilnehc-ynos commented 3 years ago

You have also pointed very correctly to the locations

I am sorry, item 1 pointed before is the bridge for ros2 client -> ros1 service. (For your case, it should be https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/include/ros1_bridge/factory.hpp#L331)

iuhilnehc-ynos commented 3 years ago

@joelbudu

My use case is a scenario where you have a long-running blocking service that depends on other threads updating a member variable before returning the response. It is a single service per node but the node will have other callbacks from topics that need to be processed concurrently.

Thank you for sharing this. At least you didn't use the same callback group (because of creating a service per node) for service callback, so we expect that the service callback can run on separate threads when using MultiThreadedExecutor.

iuhilnehc-ynos commented 3 years ago

And use ros::AsyncSpinner async_spinner(a_proper_number); (not 1, refer to http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning#Multi-threaded_Spinning) for https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/src/dynamic_bridge.cpp#L787

joelbudu commented 3 years ago

You have also pointed very correctly to the locations

I am sorry, item 1 pointed before is the bridge for ros2 client -> ros1 service. (For your case, it should be

https://github.com/ros2/ros1_bridge/blob/5d77ce0caae4133cbdafbfcfb89c91ba8d875771/include/ros1_bridge/factory.hpp#L331 )

I might need a bit of clarity on how this works. I thought the Multithreading/Callback setup would be setup on the ROS2 service ?

iuhilnehc-ynos commented 3 years ago

@joelbudu

In your case, you just cared about the ros2 service callback that can run on separate threads, so we just need to make sure the ros1 spin with multiple threads which can call ros1_service_callback in ros1_bridge to request ros2 service at the same time.

Could you just confirm if updating https://github.com/ros2/ros1_bridge/blob/dashing/src/dynamic_bridge.cpp#L778 with ros::AsyncSpinner async_spinner(2); is work for you?

joelbudu commented 3 years ago

Yes @iuhilnehc-ynos I can confirm updating https://github.com/ros2/ros1_bridge/blob/dashing/src/dynamic_bridge.cpp#L778 with ros::AsyncSpinner async_spinner(2); works for me

iuhilnehc-ynos commented 3 years ago

@joelbudu , Thank you for your confirmation. Before creating a PR for this feature, I'll go deep to check if it introduces some potential problems.

iuhilnehc-ynos commented 3 years ago

Hi, @joelbudu, I found the example you updated on https://github.com/ros2/ros1_bridge/issues/314#issue-894424329, only containing one service (testsrv) with callbackgroup4(MutuallyExclusive) in one node (DualThreadedNode), I am a little confused about what you expected.

Actual behavior ROS2 service is run in a single thread

joelbudu commented 3 years ago

Actually, there is also a geometry_msgs::msg::Vector3Stamped publisher and subscriber running in different callback groups. So there is a shared variable that is being updated by the subscriber and the value of that variable is checked in the blocking while loop of the service

joelbudu commented 3 years ago

@iuhilnehc-ynos I've been following the PR request linked to this issue. I just wanted to confirm if for my scenario only changing the ros::AsyncSpinner async_spinner(2) would still suffice or is there any other change you would advice to change? Thanks

iuhilnehc-ynos commented 3 years ago

@joelbudu

If you just implemented a demo or a trial, I think the answer is yes, otherwise, I don't recommend using loop wait inside a service callback, because the loop in the service callback will make ros1_bridge occupy one ros::AsyncSpinner thread. If there are many such waiting loops all over other services, even setting 100 for ros::AsyncSpinner is not enough.

  void test_service(
  const std::shared_ptr<rmw_request_id_t> request_header,
  const std::shared_ptr<std_srvs::srv::Empty::Request> request,
  const std::shared_ptr<std_srvs::srv::Empty::Response> response)
{

    RCLCPP_INFO(this->get_logger(), "Received message in callback1");

    RCLCPP_INFO(this->get_logger(), "msg timestamp : %f", rclcpp::Time(vector_msg_->header.stamp ).seconds());

    auto received_time = this->get_clock()->now();

    while (received_time +  rclcpp::Duration(5,0) > rclcpp::Time(vector_msg_->header.stamp) && rclcpp::ok())
    // it seems an anti-pattern here
    {  
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }

    RCLCPP_INFO(this->get_logger(), "Exitted cb after 5 secs");

}