ros-industrial / ros2_canopen

CANopen driver framework for ROS2
https://ros-industrial.github.io/ros2_canopen/manual/rolling/
125 stars 52 forks source link

How to work with CORead services? #244

Open starnus-software opened 6 months ago

starnus-software commented 6 months ago

Describe the question Hello, I'm fairly new to ROS2 so this might just be an easy question. I created a configuration package and now I'm trying to just read an index from a Nanotec controller to a CAN-USB adapter. I am aware of the CORead srv file, but I don't quite understand how to employ it in a simple node. I added the code of my node below, but I don't think it's the right way to use it. Any suggestions or example code scripts are welcome!

Logs node code:

#include <memory>
#include <string>

#include "canopen_interfaces/srv/co_read.hpp"
#include "rclcpp/rclcpp.hpp"

class SDOCheckerNode : public rclcpp::Node
{
public:
  SDOCheckerNode()
    : Node("sdo_checker_node")
  {
    // Use this constructor to initialize parameters or any other setup needed for the node
  }

  bool checkSDORead(const std::string& node_name, uint16_t index, uint8_t subindex, uint32_t data)
  {
    auto client = create_client<canopen_interfaces::srv::CORead>("/" + node_name + "/sdo_read");

    if (!client->wait_for_service(std::chrono::seconds(3)))
    {
      return false;
    }

    auto request = std::make_shared<canopen_interfaces::srv::CORead::Request>();
    request->index = index;
    request->subindex = subindex;

    auto result = client->async_send_request(request);

    client.reset(); // Explicitly destroy the client
    return result.get()->success && (data == result.get()->data);
  }
};

int main(int argc, char *argv[])
{
  rclcpp::init(argc, argv);

  // Example usage
  std::shared_ptr<SDOCheckerNode> node = std::make_shared<SDOCheckerNode>();
  bool success = node->checkSDORead("master", 0x6040, 0x00, 42);
while (rclcpp::ok()){
    if (success)
    {
      RCLCPP_INFO(node->get_logger(), "SDO Read check passed!");
    }
    else
    {
      RCLCPP_ERROR(node->get_logger(), "SDO Read check failed!");
    }
    rclcpp::sleep_for(std::chrono::seconds(1));
    }

  rclcpp::shutdown();
  return 0;
}

Setup:

Additional context bus.yml file:

options:
  dcf_path: install/config_package/share/config_package/config/gpl_bus

master:
  node_id: 1
  dcf: "CL4-E-1-12.eds"
  driver: "ros2_canopen::MasterDriver"
  package: "canopen_master_driver"

motor_controller_2:
  node_id: 2
  dcf: "CL4-E-1-12.eds"
  driver: "ros2_canopen::ProxyDriver"
  package: "canopen_proxy_driver"

motor_controller_3:
  node_id: 3
  dcf: "CL4-E-1-12.eds"
  driver: "ros2_canopen::ProxyDriver"
  package: "canopen_proxy_driver"

motor_controller_4:
  node_id: 4
  dcf: "CL4-E-1-12.eds"
  driver: "ros2_canopen::ProxyDriver"
  package: "canopen_proxy_driver"

launch file:

import os
import launch
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from ament_index_python import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    """Generate launch description with multiple components."""
    path_file = os.path.dirname(__file__)

    ld = launch.LaunchDescription()

    device_container = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            [
                os.path.join(get_package_share_directory("canopen_core"), "launch"),
                "/canopen.launch.py",
            ]
        ),
        launch_arguments={
            "master_config": os.path.join(
                get_package_share_directory("config_package"),
                "config",
                "gpl_bus",
                "master.dcf",
            ),
            "master_bin": os.path.join(
                get_package_share_directory("config_package"),
                "config",
                "gpl_bus",
                "master.bin",
            ),
            "bus_config": os.path.join(
                get_package_share_directory("config_package"),
                "config",
                "gpl_bus",
                "bus.yml",
            ),
            "can_interface_name": "can0",
        }.items(),
    )

    ld.add_action(device_container)

    # Add position_tick_motor node
    position_tick_motor_node = Node(
        package="config_package",
        executable="position_tick_client",  # Change this to the actual executable name
        output="screen",
    )

    ld.add_action(position_tick_motor_node)

    return ld

Thanks in advance!