ros2 / rclpy

rclpy (ROS Client Library for Python)
Apache License 2.0
308 stars 227 forks source link

Able to spin nodes in 1 or 2 threads, but not 3+ but only for rclpy.spin(). executor.spin() works properly. #1008

Open SteveMacenski opened 2 years ago

SteveMacenski commented 2 years ago

Bug report

Required Info:

Steps to reproduce issue

Below functions, but uncomment the 3 commented out lines to add another thread and fails

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.qos import QoSProfile, ReliabilityPolicy
from threading import Lock, Event, Thread
import time
from rclpy.executors import SingleThreadedExecutor

class MyNode(Node):
    def __init__(self):
        super().__init__('main_node')

class TakerThreaded(Node):
    def __init__(self, topic, message_type):
        super().__init__(topic + '_node')
        self.m = None
        self.qos_profile = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            depth=1
        )
        self.sub = self.create_subscription(
            message_type, topic, self._callback, self.qos_profile
        )
        self.executor = SingleThreadedExecutor()

        self.lock = Lock()
        self.event = Event()
        self.thread = Thread(target=self._thread_fn, daemon=True).start()

    def _thread_fn(self):
        rclpy.spin(self, executor=self.executor)

    def _callback(self, m):
        with self.lock:
            self.m = m
            self.event.set()

    def take(self):

        with self.lock:
            self.m = None
            self.event.clear()

        # wait
        self.event.wait()

        with self.lock:
            return self.m

def main():
    rclpy.init()
    node = MyNode()
    my_executor = SingleThreadedExecutor()
    my_executor.add_node(node)

    print('hi, I started up!')
    getter_1 = TakerThreaded('test_1', String)
    # getter_2 = TakerThreaded('test_2', String)

    msg_1 = getter_1.take()
    print(msg_1.data)
    print('1')
    my_executor.spin_once(timeout_sec=1)
    print('2')
    msg_1 = getter_1.take()
    print(msg_1.data)

    # msg_2 = getter_2.take()
    # print(msg_2.data)

if __name__ == "__main__":
    main()

Expected behavior

Be able to spin essentially infinite number of threads to spin separate executors

Actual behavior

The number of threads spinning executors is weird restricted to 2, the main thread and 1 seperate thread. If I create 2 separate threads via 2 of these TakerThreaded objects I get a ValueError: generator already executing error. If I do 1, it works fine.

If I remove the dummy node in the main thread, it doesn't give me the ability to use 2 Taker nodes. It seems like spinning executors is restricted to the main thread + 1. That seems incredibly odd that it would work with any more threads than the main at all if it wouldn't be able to work with many more.

Additional information

Full trace:

Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/home/steve/Documents/samsung/test.py", line 31, in _thread_fn
    rclpy.spin(self, executor=self.executor)
  File "/opt/ros/rolling/local/lib/python3.10/dist-packages/rclpy/__init__.py", line 222, in spin
    executor.spin_once()
  File "/opt/ros/rolling/local/lib/python3.10/dist-packages/rclpy/executors.py", line 715, in spin_once
    handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec)
  File "/opt/ros/rolling/local/lib/python3.10/dist-packages/rclpy/executors.py", line 701, in wait_for_ready_callbacks
    return next(self._cb_iter)
ValueError: generator already executing
SteveMacenski commented 2 years ago

Ah man, this is wacky, I found out I can actually spin up many but only if you never use the rclpy.spin and only ever spin using the executor.spin() API. Something is very much so wrong with the rclpy.spin API

Example exactly analog to above that doesn't work with 2+, which works for now 3+. Only change is adding the Taker node to its executor to spin, instead of using rclpy.spin(self, executor=self.executor). I don't know how that could be true unless the global executor is always being used in spite of it being given as an argument to rclpy.spin here: https://github.com/ros2/rclpy/blob/rolling/rclpy/rclpy/__init__.py#L199. Else, the implementations of spin between them are effectively the same!

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.qos import QoSProfile, ReliabilityPolicy
from threading import Lock, Event, Thread
import time
from rclpy.executors import SingleThreadedExecutor

class MyNode(Node):
    def __init__(self):
        super().__init__('main_node')

class TakerThreaded(Node):
    def __init__(self, topic, message_type):
        super().__init__(topic + '_node')
        self.m = None
        self.qos_profile = QoSProfile(
            reliability=ReliabilityPolicy.BEST_EFFORT,
            depth=1
        )
        self.sub = self.create_subscription(
            message_type, topic, self._callback, self.qos_profile
        )
        executor = SingleThreadedExecutor()
        executor.add_node(self)
        self.executor = executor

        self.lock = Lock()
        self.event = Event()
        self.thread = Thread(target=self._thread_fn, daemon=True).start()

    def _thread_fn(self):
        self.executor.spin()

    def _callback(self, m):
        with self.lock:
            self.m = m
            self.event.set()

    def take(self):

        with self.lock:
            self.m = None
            self.event.clear()

        # wait
        self.event.wait()

        with self.lock:
            return self.m

def main():
    rclpy.init()
    node = MyNode()
    my_executor = SingleThreadedExecutor()
    my_executor.add_node(node)

    print('hi, I started up!')
    getter_1 = TakerThreaded('test_1', String)
    getter_2 = TakerThreaded('test_2', String)
    getter_3 = TakerThreaded('test_3', String)

    msg_1 = getter_1.take()
    print(msg_1.data)
    print('1')
    my_executor.spin_once(timeout_sec=1)
    # rclpy.spin_once(node, timeout_sec=1)
    print('2')
    msg_1 = getter_1.take()
    print(msg_1.data)

    msg_2 = getter_2.take()
    print(msg_2.data)

    msg_3 = getter_3.take()
    print(msg_3.data)

if __name__ == "__main__":
    main()
fujitatomoya commented 2 years ago

interesting... i am not sure why this is happening but it uses global executor internally, that is the reason for generator already executing.

any thoughts?

SteveMacenski commented 2 years ago

I'm definitely not very knowledgeable about python (< 1% of what I write annually is python) - I can only report the symptoms and what I found works, hopefully that helps someone that is more familiar with rclpy / Python3 :smile:

fujitatomoya commented 2 years ago

@iuhilnehc-ynos anything rings bell?

iuhilnehc-ynos commented 2 years ago

@fujitatomoya

I found that the executor is a property of Node, and the Node.executor only use weakref for it. Using a temporary variable (e.g self.executor = SingleThreadedExecutor()) is dangerous. rclpy.spin(self, executor=self.executor)(Node.executor getter) will be rclpy.spin(self, executor=None) actually in the https://github.com/ros2/rclpy/issues/1008#issue-1368414267.

It is dangerous as well in the https://github.com/ros2/rclpy/issues/1008#issuecomment-1242564908

        executor = SingleThreadedExecutor()
        executor.add_node(self)
        self.executor = executor

I don't know if this example is the correct design. If using this style, I think user should keep the lifecycle of executor. (e.g. use self.my_executor instead of self.executor in the TakerThreaded which derived from Node)

SteveMacenski commented 2 years ago

Sounds like self.executor within rclpy needs to be an internal self._executor to remove that kind of collision potential from user-space code.

You're right that if I simply rename self.executor to anything else in the TakerThreaded class, things work fine even for the first example I gave. I just audited all of node.py and it appears that they only members that don't start with an underscore are

All others do, it seems like these need to have that added. Do you think that explains https://github.com/ros2/rclpy/issues/1009 too? I do

fujitatomoya commented 2 years ago

Using a temporary variable (e.g self.executor = SingleThreadedExecutor()) is dangerous.

i see, that returns None, and that leads to use global executor which is already spinning.

SteveMacenski commented 2 years ago

So just renaming these should be a good solution?

fujitatomoya commented 2 years ago

Sounds like self.executor within rclpy needs to be an internal self._executor to remove that kind of collision potential from user-space code.

this sounds reasonable to me. but not sure, there could be something else i miss, but happy to review and discuss.

SteveMacenski commented 2 years ago

Unfortunately, I'm very much so lacking in time between now and ROSCon so I can't dedicate time right now to doing this myself, just wanted to report and find the cause.

jrutgeer commented 11 months ago

Following this RSE post, I compared the spin() implementations in rclpy and rclcpp, and they differ.

Reporting here as it seems related to this issue (sorry if isn't; I did not read through the discussion in detail as I am not using rclpy myself):

So on first sight I would assume that it is not allowed to call rclpy.spin from different threads, as this implies calling spin_once() on the same exector from different threads concurrently?

fujitatomoya commented 10 months ago

looking again at the current implementation weakref is intentionally used to avoid circular reference, probably example code is not good. any thoughts?

In rclcpp, each call of spin() instantiates a SingleThreadedExecutor, see here, But in rclpy, a global executor is retrieved, see here.

true, there is a difference. rclcpp does not have global scoped executor in the process space. i think that is because task is already in the executor in rclpy, that requires we need to have persistent executor for rclpy::spin. those were developed a long time ago, so i do not really know the history.