ZATiTech / open_planner

Integrated open source planner and related tools for autonomous navigation of autonomous vehicle and mobile robots
Apache License 2.0
26 stars 6 forks source link

How to do steering map calibration #19

Open ahsan155 opened 1 month ago

ahsan155 commented 1 month ago

I am not using open_planner. But I wanted to know how to generate steering map for autoware to send accurate steering command to carla. Autoware has raw_vehicle_cmd_converter node and this node uses a steer_map.csv file which I assume is a lookup table to map steering angle to velocity/throttle.

hatem-darweesh commented 1 month ago

Which part is missing exactly: Can you communicate with CARLA Simulator using Pythin APIs ? Can you Receive the steer/velocity (control message) from Autoware using ROS2 ? Is the problem is just how to convert from steering angle to Torque and from target velocity to acceleration/braking stroke (force)?

ahsan155 commented 1 month ago

So, I am using this ml based calibrator https://github.com/pixmoving-moveit/learning_based_vehicle_calibration/tree/normal_vehicle . I am open to suggestion if there are other ways of doing calibration. This ml based calibrator generates 5 different steer map for different steering ranges but autoware only needs one. I generated one map for whole steering range (-0.6, 0.6). when I use the steering map with raw_vehicle_cmd_converter, the converter keeps logging: Screenshot from 2024-07-10 00-43-45 So, steering tires change angle very aggressively and vehicle drives off the street.

I might be doing something wrong in data collection. Here is how I collect data: I spawn the vehicle in carla using carla-ros-bridge (using launch script carla_ros_bridge_with_example_ego_vehicle.launch.py). Then I drive the carla vehicle on an empty plane with the script

import carla
import random
import time

def smooth_control(current, target, smoothing_factor=0.1):
    return current + smoothing_factor * (target - current)

class CarlaController:
    def __init__(self, world):
        self.world = world
        self.vehicle = None
        self.current_throttle = 0
        self.current_steer = 0
        self.target_throttle = 0
        self.target_steer = 0

    def setup(self):

        self.vehicle = self.world.get_actors().filter('vehicle.tesla.model3')[0]

    def update_control(self):
        # Update targets periodically
        if random.random() < 0.05:  # 5% chance to change targets each iteration
            self.target_throttle = random.uniform(0.1, 0.20)
            self.target_steer = random.uniform(-0.60, 0.60)

        # Smooth control changes
        self.current_throttle = smooth_control(self.current_throttle, self.target_throttle)
        self.current_steer = smooth_control(self.current_steer, self.target_steer)

        # Apply control
        velocity = self.vehicle.get_velocity()
        speed_kmh = 3.6 * math.sqrt(velocity.x**2 + velocity.y**2 + velocity.z**2)

        if speed_kmh  > 5:
            control = carla.VehicleControl(throttle=0, brake=0.05, steer=self.current_steer)
        else:
            control = carla.VehicleControl(throttle=self.current_throttle, brake=0, steer=self.current_steer)

        self.vehicle.apply_control(control)

    def run(self):
        try:
            print("Simulation started. Press Ctrl+C to stop.")
            while True:
                self.update_control()

                self.world.tick()
                time.sleep(0.05)  # Ensure consistent timing
        except KeyboardInterrupt:
            print("Simulation stopped by user.")
        finally:
            pass
# Usage
#client = carla.Client('localhost', 2000)
#world = client.get_world()
controller = CarlaController(world)
controller.setup()
controller.run()

I use this script to apply a range of throttle/steer rather than setting the vehicle on autopilot. In the above code, I am not sure if should use world.tick() and time.sleep(). Carla is running in sync mode with fixed_delta_seconds=0.05.

The calibrator requires publishing 5 different types of messages.

  1. VelocityReport
  2. SteeringReport
  3. ActuationStatusStamped
  4. Imu
  5. Float32

The calibrator specifies data requirement as follows

# brake paddle value (frome 0 to 1, float)
/vehicle/status/actuation_status -> status.brake_status
# throttle paddle value (frome 0 to 1, float)
/vehicle/status/actuation_status -> status.accel_status
# velocity value (unit is m/s, float)
/vehicle/status/velocity_status -> longitudinal_velocity
# steering value (from 0 to 0.5 [radians], float)
/vehicle/status/steering_status -> steering_tire_angle
# imu data(unit is rad/s or m/s)
/vehicle/status/imu -> linear_acceleration.x
# pitch angle (unit is degrees)
/sensing/combination_navigation/chc/pitch -> data

I use the following scripts to publish ROS messages:

for VelocityReport,

from nav_msgs.msg import Odometry
from autoware_auto_vehicle_msgs.msg import VelocityReport
from geometry_msgs.msg import Vector3

class VelocityReportPublisher(Node):
    def __init__(self):
        super().__init__('velocity_report_publisher')

        # Create a subscription to the Odometry topic
        self.subscription = self.create_subscription(
            Odometry,
            '/carla/ego_vehicle/odometry', # Adjust this topic if needed
            self.odometry_callback,
            10)

        # Create a publisher for the VelocityReport topic
        self.publisher = self.create_publisher(
            VelocityReport,
            '/vehicle/status/velocity_status',
            10)

        self.get_logger().info('Velocity Report Publisher node has started')

    def odometry_callback(self, msg):
        # Create a VelocityReport message
        velocity_report = VelocityReport()

        # Set the header
        velocity_report.header = msg.header

        # Set the longitudinal, lateral, and heading rate
        velocity_report.longitudinal_velocity = msg.twist.twist.linear.x
        velocity_report.lateral_velocity = -msg.twist.twist.linear.y
        velocity_report.heading_rate = -msg.twist.twist.angular.z

        # Publish the VelocityReport
        self.publisher.publish(velocity_report)

for SteeringReport,

from carla_msgs.msg import CarlaEgoVehicleStatus
from autoware_auto_vehicle_msgs.msg import SteeringReport
import numpy as np

class SteeringReportPublisher(Node):
    def __init__(self):
        super().__init__('steering_report_publisher')

        # Create a subscription to the CarlaEgoVehicleStatus topic
        self.subscription = self.create_subscription(
            CarlaEgoVehicleStatus,
            '/carla/ego_vehicle/vehicle_status',
            self.status_callback,
            10)

        # Create a publisher for the SteeringReport topic
        self.publisher = self.create_publisher(
            SteeringReport,
            '/vehicle/status/steering_status',
            10)

        # Calculate the steering polynomial
        self.steer_to_angle_polynomial = self.calculate_steer_to_angle_polynomial()

        self.get_logger().info('Steering Report Publisher node has started')

    def status_callback(self, msg):
        # Create a SteeringReport message
        steering_report = SteeringReport()

        # Set the header
        steering_report.stamp = self.get_clock().now().to_msg()

        # Convert CARLA steering (-1 to 1) to steering angle using the polynomial
        carla_steering = msg.control.steer

        # Set the steering angle
        steering_report.steering_tire_angle = abs(carla_steering) * 0.5

        # Publish the SteeringReport
        self.publisher.publish(steering_report)

for ActuationStatusStamped,

from carla_msgs.msg import CarlaEgoVehicleStatus
from tier4_vehicle_msgs.msg import ActuationStatusStamped
import math

class CarlaToActuationStatus(Node):
    def __init__(self):
        super().__init__('carla_to_actuation_status')

        # Subscribe to CARLA topic
        self.status_sub = self.create_subscription(
            CarlaEgoVehicleStatus,
            '/carla/ego_vehicle/vehicle_status',
            self.status_callback,
            10)

        # Publisher for ActuationStatusStamped
        self.actuation_pub = self.create_publisher(
            ActuationStatusStamped,
            '/vehicle/status/actuation_status',
            10)

        # Parameters for conversion (you may want to make these configurable)
        self.max_steer_angle = math.radians(70)  # Maximum steering angle in radians
        self.max_acceleration = 5.0  # Maximum acceleration in m/s^2
        self.max_brake_pressure = 10.0  # Maximum brake pressure in MPa

        self.get_logger().info('CarlaToActuationStatus node has started')

    def status_callback(self, msg: CarlaEgoVehicleStatus):
        actuation_msg = ActuationStatusStamped()
        actuation_msg.header = msg.header

        # Convert throttle to acceleration (m/s^2)
        actuation_msg.status.accel_status = msg.control.throttle

        # Convert brake to brake pressure (MPa)
        actuation_msg.status.brake_status = msg.control.brake
        # Convert steering to actual steering angle (radians)
        #actuation_msg.status.steer_status = msg.control.steer

        self.actuation_pub.publish(actuation_msg)

for IMU,

from sensor_msgs.msg import Imu

class ImuRepublisher(Node):
    def __init__(self):
        super().__init__('imu_republisher')

        # Create a subscription to the Carla IMU topic
        self.subscription = self.create_subscription(
            Imu,
            '/carla/ego_vehicle/imu',
            self.imu_callback,
            10)

        # Create a publisher for the new topic
        self.publisher = self.create_publisher(
            Imu,
            '/sensing/gnss/chc/imu',
            10)

        self.get_logger().info('IMU Republisher node has started')

    def imu_callback(self, msg):
        # Simply republish the received message to the new topic
        self.publisher.publish(msg)

for Float32,

from carla_msgs.msg import CarlaEgoVehicleStatus
from std_msgs.msg import Float32
import math

class PitchPublisher(Node):
    def __init__(self):
        super().__init__('pitch_publisher')

        self.subscription = self.create_subscription(
            CarlaEgoVehicleStatus,
            '/carla/ego_vehicle/vehicle_status',
            self.status_callback,
            10)

        self.publisher = self.create_publisher(
            Float32,
            '/sensing/gnss/chc/pitch',
            10)

        self.get_logger().info('Pitch Publisher node has started')

    def status_callback(self, msg):
        pitch_msg = Float32()
        pitch_msg.data = self.quaternion_to_pitch(msg.orientation)
        self.publisher.publish(pitch_msg)
        self.get_logger().debug(f'Published pitch: {pitch_msg.data}')

    def quaternion_to_pitch(self, quaternion):
        # Convert quaternion to pitch angle
        sinp = 2.0 * (quaternion.w * quaternion.y - quaternion.z * quaternion.x)
        pitch_radians = math.asin(sinp)
        pitch_degrees = math.degrees(pitch_radians)
        return pitch_degrees

Am I doing anything wrong in publishing these messages?

After generating the steer map, I put it inside raw_vehicle_cmd_converter data folder. To test steering, I spawn carla vehicle by carla-ros-bridge as I did when collecting data. I start raw_vehicle_cmd_converter to get actuation instruction from autoware. I use the following script to subscribe to actuation message and apply them on carla vehicle

#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from tier4_vehicle_msgs.msg import ActuationCommandStamped
import carla

client = carla.Client('localhost', 2000) 
world = client.get_world()

class CarlaAutowareBridge(Node):
    def __init__(self):
        super().__init__('carla_autoware_bridge')

        # Initialize CARLA client

        self.world = world

        # Get the first vehicle actor
        self.vehicle = self.world.get_actors().filter('vehicle.tesla.model3')[0] # vehicle.tesla.model3 # vehicle.dodge.charger_2020

        # Create a subscriber to listen to Autoware's AckermannControlCommand
        self.subscription = self.create_subscription(
            ActuationCommandStamped,
            '/control/command/actuation_cmd',  # Adjust to the correct topic if needed
            self.actuation_control_callback,
            10)

        # Ensure subscription is active
        self.get_logger().info('Subscribed to /control/command/actuation_cmd')

    def actuation_control_callback(self, msg):
        control = carla.VehicleControl()

        control.throttle = msg.actuation.accel_cmd
        control.brake = msg.actuation.brake_cmd
        control.steer = msg.actuation.steer_cmd

        # Log received commands for debugging
        self.get_logger().info(f'accel {msg.actuation.accel_cmd} brake {msg.actuation.brake_cmd} steer {msg.actuation.steer_cmd} angle {control.steer * 70}')

        self.vehicle.apply_control(control)

def main(args=None):
    rclpy.init(args=args)
    node = CarlaAutowareBridge()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

It would be great if you can review my work and correct any mistake.