YASMIN (Yet Another State MachINe)

YASMIN is a project focused on implementing robot behaviors using Finite State Machines (FSM). It is available for ROS 2, Python and C++.

Table of Contents

  1. Features
  2. Installation
  3. Demos
  4. YASMIN Viewer
  5. Citations



# clone
$ cd ~/ros2_ws/src
$ git clone https://github.com/uleroboticsgroup/yasmin.git

# dependencies
$ cd ~/ros2_ws
$ rosdep install --from-paths src --ignore-src -r -y
$ cd src/yasmin
$ pip3 install -r requirements.txt

# colcon
$ cd ~/ros2_ws
$ colcon build

Docker Setup

If your operating system doesn't support ROS 2 humble, docker is a great alternative.

First of all, you have to build the project and create an image like so:

## Assuming you are in the correct project directory
$ docker build -t yasmin .

To use a shortcut, you may use the following command:

## Assuming you are in the correct project directory
$ make docker_build

After the image is created, copy and paste the following command to the terminal to run the image:

## Assuming you are in the correct project directory
$ docker run -it --net=host --ipc=host --privileged --env="DISPLAY"  --env="QT_X11_NO_MITSHM=1" --volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" --volume="${XAUTHORITY}:/root/.Xauthority"  --entrypoint /bin/bash yasmin

To use a shortcut, you may use following command:

$ make docker_run

Running the package at docker image

If you are in the docker image , this project is already sourced and the demo script can be run as the following command;

$ cd /root/ros2_ws/
$ ros2 run yasmin_demos yasmin_demo.py


There are some examples, for both Python and C++, that can be found in yasmin_demos.


Vanilla Demo (FSM)

$ ros2 run yasmin_demos yasmin_demo.py

Click to expand ```python #!/usr/bin/env python3 import time import rclpy from yasmin import State from yasmin import Blackboard from yasmin import StateMachine from yasmin_viewer import YasminViewerPub # define state Foo class FooState(State): def __init__(self) -> None: super().__init__(["outcome1", "outcome2"]) self.counter = 0 def execute(self, blackboard: Blackboard) -> str: print("Executing state FOO") time.sleep(3) if self.counter < 3: self.counter += 1 blackboard.foo_str = f"Counter: {self.counter}" return "outcome1" else: return "outcome2" # define state Bar class BarState(State): def __init__(self) -> None: super().__init__(outcomes=["outcome3"]) def execute(self, blackboard: Blackboard) -> str: print("Executing state BAR") time.sleep(3) print(blackboard.foo_str) return "outcome3" # main def main(): print("yasmin_demo") # init ROS 2 rclpy.init() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "FOO", FooState(), transitions={ "outcome1": "BAR", "outcome2": "outcome4" } ) sm.add_state( "BAR", BarState(), transitions={ "outcome3": "FOO" } ) # pub FSM info YasminViewerPub("yasmin_demo", sm) # execute FSM outcome = sm() print(outcome) # shutdown ROS 2 rclpy.shutdown() if __name__ == "__main__": main() ```

Service Demo (FSM + ROS 2 Service Client)

$ ros2 run demo_nodes_py add_two_ints_server
$ ros2 run yasmin_demos service_client_demo.py
Click to expand ```python import rclpy from example_interfaces.srv import AddTwoInts from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ServiceState from yasmin_ros.basic_outcomes import SUCCEED, ABORT from yasmin_viewer import YasminViewerPub class AddTwoIntsState(ServiceState): def __init__(self) -> None: super().__init__( AddTwoInts, # srv type "/add_two_ints", # service name self.create_request_handler, # cb to create the request ["outcome1"], # outcomes. Includes (SUCCEED, ABORT) self.response_handler # cb to process the response ) def create_request_handler(self, blackboard: Blackboard) -> AddTwoInts.Request: req = AddTwoInts.Request() req.a = blackboard.a req.b = blackboard.b return req def response_handler( self, blackboard: Blackboard, response: AddTwoInts.Response ) -> str: blackboard.sum = response.sum return "outcome1" def set_ints(blackboard: Blackboard) -> str: blackboard.a = 10 blackboard.b = 5 return SUCCEED def print_sum(blackboard: Blackboard) -> str: print(f"Sum: {blackboard.sum}") return SUCCEED # main def main(): print("yasmin_service_client_demo") # init ROS 2 rclpy.init() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "SETTING_INTS", CbState([SUCCEED], set_ints), transitions={ SUCCEED: "ADD_TWO_INTS" } ) sm.add_state( "ADD_TWO_INTS", AddTwoIntsState(), transitions={ "outcome1": "PRINTING_SUM", SUCCEED: "outcome4", ABORT: "outcome4" } ) sm.add_state( "PRINTING_SUM", CbState([SUCCEED], print_sum), transitions={ SUCCEED: "outcome4" } ) # pub FSM info YasminViewerPub("YASMIN_SERVICE_CLIENT_DEMO", sm) # execute FSM outcome = sm() print(outcome) # shutdown ROS 2 rclpy.shutdown() if __name__ == "__main__": main() ```

Action Demo (FSM + ROS 2 Action)

$ ros2 run action_tutorials_cpp fibonacci_action_server
$ ros2 run yasmin_demos action_client_demo.py
Click to expand ```python import rclpy from action_tutorials_interfaces.action import Fibonacci from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub class FibonacciState(ActionState): def __init__(self) -> None: super().__init__( Fibonacci, # action type "/fibonacci", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) self.response_handler, # cb to process the response self.print_feedback # cb to process the feedback ) def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal: goal = Fibonacci.Goal() goal.order = blackboard.n return goal def response_handler( self, blackboard: Blackboard, response: Fibonacci.Result ) -> str: blackboard.fibo_res = response.sequence return SUCCEED def print_feedback( self, blackboard: Blackboard, feedback: Fibonacci.Feedback ) -> None: print(f"Received feedback: {list(feedback.partial_sequence)}") def print_result(blackboard: Blackboard) -> str: print(f"Result: {blackboard.fibo_res}") return SUCCEED # main def main(): print("yasmin_action_client_demo") # init ROS 2 rclpy.init() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "CALLING_FIBONACCI", FibonacciState(), transitions={ SUCCEED: "PRINTING_RESULT", CANCEL: "outcome4", ABORT: "outcome4" } ) sm.add_state( "PRINTING_RESULT", CbState([SUCCEED], print_result), transitions={ SUCCEED: "outcome4" } ) # pub FSM info YasminViewerPub("YASMIN_ACTION_CLIENT_DEMO", sm) # create an initial blackoard blackboard = Blackboard() blackboard.n = 10 # execute FSM outcome = sm(blackboard) print(outcome) # shutdown ROS 2 rclpy.shutdown() if __name__ == "__main__": main() ```

Monitor Demo (FSM + ROS 2 Subscriber)

$ ros2 run yasmin_demos monitor_demo.py
Click to expand ```python import rclpy from rclpy.qos import qos_profile_sensor_data from nav_msgs.msg import Odometry from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import MonitorState from yasmin_ros.basic_outcomes import CANCEL from yasmin_viewer import YasminViewerPub class PrintOdometryState(MonitorState): def __init__(self, times: int) -> None: super().__init__( Odometry, # msg type "odom", # topic name ["outcome1", "outcome2"], # outcomes self.monitor_handler, # monitor handler callback qos=qos_profile_sensor_data, # qos for the topic sbscription msg_queue=10, # queue of the monitor handler callback timeout=10 # timeout to wait for msgs in seconds # if not None, CANCEL outcome is added ) self.times = times def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str: print(msg) self.times -= 1 if self.times <= 0: return "outcome2" return "outcome1" # main def main(): print("yasmin_monitor_demo") # init ROS 2 rclpy.init() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "PRINTING_ODOM", PrintOdometryState(5), transitions={ "outcome1": "PRINTING_ODOM", "outcome2": "outcome4", CANCEL: "outcome4" } ) # pub FSM info YasminViewerPub("YASMIN_MONITOR_DEMO", sm) # execute FSM outcome = sm() print(outcome) # shutdown ROS 2 rclpy.shutdown() if __name__ == "__main__": main() ```

Nav2 Demo (Hierarchical FSM + ROS 2 Action)

$ ros2 run yasmin_demos nav_demo.py
Click to expand ```python import random import rclpy from geometry_msgs.msg import Pose from nav2_msgs.action import NavigateToPose from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub HAS_NEXT = "has_next" END = "end" class Nav2State(ActionState): def __init__(self) -> None: super().__init__( NavigateToPose, # action type "/navigate_to_pose", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) None # cb to process the response ) def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal: goal = NavigateToPose.Goal() goal.pose.pose = blackboard.pose goal.pose.header.frame_id = "map" return goal def create_waypoints(blackboard: Blackboard) -> str: blackboard.waypoints = { "entrance": [1.25, 6.30, -0.78, 0.67], "bathroom": [4.89, 1.64, 0.0, 1.0], "livingroom": [1.55, 4.03, -0.69, 0.72], "kitchen": [3.79, 6.77, 0.99, 0.12], "bedroom": [7.50, 4.89, 0.76, 0.65], } return SUCCEED def take_random_waypoint(blackboard) -> str: blackboard.random_waypoints = random.sample( list(blackboard.waypoints.keys()), blackboard.waypoints_num) return SUCCEED def get_next_waypoint(blackboard: Blackboard) -> str: if not blackboard.random_waypoints: return END wp_name = blackboard.random_waypoints.pop(0) wp = blackboard.waypoints[wp_name] pose = Pose() pose.position.x = wp[0] pose.position.y = wp[1] pose.orientation.z = wp[2] pose.orientation.w = wp[3] blackboard.pose = pose blackboard.text = f"I have reached waypoint {wp_name}" return HAS_NEXT # main def main(): print("yasmin_nav2_demo") # init ROS 2 rclpy.init() # create state machines sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) # add states sm.add_state( "CREATING_WAYPOINTS", CbState([SUCCEED], create_waypoints), transitions={ SUCCEED: "TAKING_RANDOM_WAYPOINTS" } ) sm.add_state( "TAKING_RANDOM_WAYPOINTS", CbState([SUCCEED], take_random_waypoint), transitions={ SUCCEED: "NAVIGATING" } ) nav_sm.add_state( "GETTING_NEXT_WAYPOINT", CbState([END, HAS_NEXT], get_next_waypoint), transitions={ END: SUCCEED, HAS_NEXT: "NAVIGATING" } ) nav_sm.add_state( "NAVIGATING", Nav2State(), transitions={ SUCCEED: "GETTING_NEXT_WAYPOINT", CANCEL: CANCEL, ABORT: ABORT } ) sm.add_state( "NAVIGATING", nav_sm, transitions={ SUCCEED: SUCCEED, CANCEL: CANCEL, ABORT: ABORT } ) # pub FSM info YasminViewerPub("YASMIN_NAV_DEMO", sm) # execute FSM blackboard = Blackboard() blackboard.waypoints_num = 2 outcome = sm(blackboard) print(outcome) # shutdown ROS 2 rclpy.shutdown() if __name__ == "__main__": main() ```


Vanilla Demo

$ ros2 run yasmin_demos yasmin_demo
Click to expand ```cpp #include #include #include #include #include "rclcpp/rclcpp.hpp" #include "yasmin/state.hpp" #include "yasmin/state_machine.hpp" #include "yasmin_viewer/yasmin_viewer_pub.hpp" // define state Foo class FooState : public yasmin::State { public: int counter; FooState() : yasmin::State({"outcome1", "outcome2"}) { this->counter = 0; }; std::string execute(std::shared_ptr blackboard) { std::cout << "Executing state FOO\n"; std::this_thread::sleep_for(std::chrono::seconds(3)); if (this->counter < 3) { this->counter += 1; blackboard->set("foo_str", "Counter: " + std::to_string(this->counter)); return "outcome1"; } else { return "outcome2"; } } std::string to_string() { return "FooState"; } }; // define state Bar class BarState : public yasmin::State { public: BarState() : yasmin::State({"outcome3"}){}; std::string execute(std::shared_ptr blackboard) { std::cout << "Executing state BAR\n"; std::this_thread::sleep_for(std::chrono::seconds(3)); std::cout << blackboard->get("foo_str") << "\n"; return "outcome3"; } std::string to_string() { return "BarState"; } }; int main(int argc, char *argv[]) { std::cout << "yasmin_demo\n"; rclcpp::init(argc, argv); // create a state machine auto sm = std::make_shared( yasmin::StateMachine({"outcome4"})); // add states sm->add_state("FOO", std::make_shared(), {{"outcome1", "BAR"}, {"outcome2", "outcome4"}}); sm->add_state("BAR", std::make_shared(), {{"outcome3", "FOO"}}); // pub yasmin_viewer::YasminViewerPub yasmin_pub("YASMIN_ACTION_CLIENT_DEMO", sm); // execute std::string outcome = (*sm.get())(); std::cout << outcome << "\n"; rclcpp::shutdown(); return 0; } ```

Service Demo (FSM + ROS 2 Service Client)

$ ros2 run demo_nodes_py add_two_ints_server
$ ros2 run yasmin_demos service_client_demo
Click to expand ```cpp #include #include #include #include "example_interfaces/srv/add_two_ints.hpp" #include "rclcpp/rclcpp.hpp" #include "yasmin/cb_state.hpp" #include "yasmin/state_machine.hpp" #include "yasmin_ros/basic_outcomes.hpp" #include "yasmin_ros/service_state.hpp" #include "yasmin_viewer/yasmin_viewer_pub.hpp" using std::placeholders::_1; using std::placeholders::_2; std::string set_ints(std::shared_ptr blackboard) { blackboard->set("a", 10); blackboard->set("b", 5); return yasmin_ros::basic_outcomes::SUCCEED; } std::string print_sum(std::shared_ptr blackboard) { fprintf(stderr, "Sum: %d\n", blackboard->get("sum")); return yasmin_ros::basic_outcomes::SUCCEED; } class AddTwoIntsState : public yasmin_ros::ServiceState { public: AddTwoIntsState() : yasmin_ros::ServiceState // msg ( // node "/add_two_ints", // srv name std::bind(&AddTwoIntsState::create_request_handler, this, _1), {"outcome1"}, std::bind(&AddTwoIntsState::response_handler, this, _1, _2)){}; example_interfaces::srv::AddTwoInts::Request::SharedPtr create_request_handler( std::shared_ptr blackboard) { auto request = std::make_shared(); request->a = blackboard->get("a"); request->b = blackboard->get("b"); return request; } std::string response_handler( std::shared_ptr blackboard, example_interfaces::srv::AddTwoInts::Response::SharedPtr response) { blackboard->set("sum", response->sum); return "outcome1"; } std::string to_string() { return "AddTwoIntsState"; } }; int main(int argc, char *argv[]) { std::cout << "yasmin_service_client_demo\n"; rclcpp::init(argc, argv); // create a state machine auto sm = std::make_shared( yasmin::StateMachine({"outcome4"})); // add states sm->add_state("SETTING_INTS", std::make_shared(yasmin::CbState( {yasmin_ros::basic_outcomes::SUCCEED}, set_ints)), {{yasmin_ros::basic_outcomes::SUCCEED, "ADD_TWO_INTS"}}); sm->add_state("ADD_TWO_INTS", std::make_shared(), {{"outcome1", "PRINTING_SUM"}, {yasmin_ros::basic_outcomes::SUCCEED, "outcome4"}, {yasmin_ros::basic_outcomes::ABORT, "outcome4"}}); sm->add_state("PRINTING_SUM", std::make_shared(yasmin::CbState( {yasmin_ros::basic_outcomes::SUCCEED}, print_sum)), {{yasmin_ros::basic_outcomes::SUCCEED, "outcome4"}}); // pub yasmin_viewer::YasminViewerPub yasmin_pub("YASMIN_ACTION_CLIENT_DEMO", sm); // execute std::string outcome = (*sm.get())(); std::cout << outcome << "\n"; rclcpp::shutdown(); return 0; } ```

Action Demo (FSM + ROS 2 Action)

$ ros2 run action_tutorials_cpp fibonacci_action_server
$ ros2 run yasmin_demos action_client_demo
Click to expand ```cpp #include #include #include #include "action_tutorials_interfaces/action/fibonacci.hpp" #include "yasmin/cb_state.hpp" #include "yasmin/state_machine.hpp" #include "yasmin_ros/action_state.hpp" #include "yasmin_ros/basic_outcomes.hpp" #include "yasmin_ros/yasmin_node.hpp" #include "yasmin_viewer/yasmin_viewer_pub.hpp" using std::placeholders::_1; using std::placeholders::_2; using Fibonacci = action_tutorials_interfaces::action::Fibonacci; std::string print_result(std::shared_ptr blackboard) { auto fibo_res = blackboard->get>("sum"); fprintf(stderr, "Result received:"); for (auto ele : fibo_res) { fprintf(stderr, " %d,", ele); } fprintf(stderr, "\n"); return yasmin_ros::basic_outcomes::SUCCEED; } class FibonacciState : public yasmin_ros::ActionState { public: FibonacciState() : yasmin_ros::ActionState( "/fibonacci", // action name // # cb to create the goal std::bind(&FibonacciState::create_goal_handler, this, _1), // # cb to process the response std::bind(&FibonacciState::response_handler, this, _1, _2), // cb to process the feedback std::bind(&FibonacciState::print_feedback, this, _1, _2)){}; Fibonacci::Goal create_goal_handler( std::shared_ptr blackboard) { auto goal = Fibonacci::Goal(); goal.order = blackboard->get("n"); return goal; } std::string response_handler(std::shared_ptr blackboard, Fibonacci::Result::SharedPtr response) { blackboard->set>("sum", response->sequence); return yasmin_ros::basic_outcomes::SUCCEED; } void print_feedback(std::shared_ptr blackboard, std::shared_ptr feedback) { (void)blackboard; std::stringstream ss; ss << "Next number in sequence received: "; for (auto number : feedback->partial_sequence) { ss << number << " "; } fprintf(stderr, "%s\n", ss.str().c_str()); } std::string to_string() { return "FibonacciState"; } }; int main(int argc, char *argv[]) { std::cout << "yasmin_action_client_demo\n"; rclcpp::init(argc, argv); // create a state machine auto sm = std::make_shared( yasmin::StateMachine({"outcome4"})); // add states sm->add_state("CALLING_FIBONACCI", std::make_shared(), {{yasmin_ros::basic_outcomes::SUCCEED, "PRINTING_RESULT"}, {yasmin_ros::basic_outcomes::CANCEL, "outcome4"}, {yasmin_ros::basic_outcomes::ABORT, "outcome4"}}); sm->add_state("PRINTING_RESULT", std::make_shared(yasmin::CbState( {yasmin_ros::basic_outcomes::SUCCEED}, print_result)), {{yasmin_ros::basic_outcomes::SUCCEED, "outcome4"}}); // pub yasmin_viewer::YasminViewerPub yasmin_pub("YASMIN_ACTION_CLIENT_DEMO", sm); // create an initial blackboard std::shared_ptr blackboard = std::make_shared(); blackboard->set("n", 10); // execute std::string outcome = (*sm.get())(blackboard); std::cout << outcome << "\n"; rclcpp::shutdown(); return 0; } ```

Monitor Demo (FSM + ROS 2 Subscriber)

$ ros2 run yasmin_demos monitor_demo
Click to expand ```cpp #include #include #include #include "rclcpp/rclcpp.hpp" #include "nav_msgs/msg/odometry.hpp" #include "yasmin/state_machine.hpp" #include "yasmin_ros/basic_outcomes.hpp" #include "yasmin_ros/monitor_state.hpp" #include "yasmin_viewer/yasmin_viewer_pub.hpp" using std::placeholders::_1; using std::placeholders::_2; class PrintOdometryState : public yasmin_ros::MonitorState { public: int times; PrintOdometryState(int times) : yasmin_ros::MonitorState // msg type ("odom", // topic name {"outcome1", "outcome2"}, // outcomes std::bind(&PrintOdometryState::monitor_handler, this, _1, _2), // monitor handler callback 10, // qos for the topic sbscription 10, // queue of the monitor handler callback 10 // timeout to wait for msgs in seconds // if >0, CANCEL outcome is added ) { this->times = times; }; std::string monitor_handler(std::shared_ptr blackboard, std::shared_ptr msg) { (void)blackboard; std::cout << "x: " << msg->pose.pose.position.x << "\n"; std::cout << "y: " << msg->pose.pose.position.y << "\n"; std::cout << "z: " << msg->pose.pose.position.z << "\n"; std::cout << "\n"; this->times--; if (this->times <= 0) { return "outcome2"; } return "outcome1"; } std::string to_string() { return "PrintOdometryState"; } }; int main(int argc, char *argv[]) { std::cout << "yasmin_monitor_demo\n"; rclcpp::init(argc, argv); // create a state machine auto sm = std::make_shared( yasmin::StateMachine({"outcome4"})); // add states sm->add_state("PRINTING_ODOM", std::make_shared(5), {{"outcome1", "PRINTING_ODOM"}, {"outcome2", "outcome4"}, {yasmin_ros::basic_outcomes::CANCEL, "outcome4"}}); // pub yasmin_viewer::YasminViewerPub yasmin_pub("YASMIN_ACTION_CLIENT_DEMO", sm); // execute std::string outcome = (*sm.get())(); std::cout << outcome << "\n"; rclcpp::shutdown(); return 0; } ```


This viewer allows monitoring YASMIN's FSM. It is implemented with Flask and ReactJS. A filter is provided to show only one FSM.


$ ros2 run yasmin_viewer yasmin_viewer_node


Custom host and port

$ ros2 run yasmin_viewer yasmin_viewer_node --ros-args -p host:= -p port:=5032


