ros2 / rclpy

rclpy (ROS Client Library for Python)
Apache License 2.0
309 stars 227 forks source link

Timer created during async service handler never fires #834

Closed jtbandes closed 3 years ago

jtbandes commented 3 years ago

Bug report

Required Info:

Steps to reproduce issue

Using an async function as a service handler seems to be supported, but if a timer is added during the service handler (e.g. to respond to the service after an artificial delay), the timer never fires. I also tried using a MultiThreadedExecutor(2) with the same results.

from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node

class MinimalService(Node):
    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

    async def add_two_ints_callback(self, request, response):
        self.get_logger().info(f'Executor: {str(self.executor)} ', )
        self.get_logger().info('Incoming request... a: %d b: %d' % (request.a, request.b))
        response.sum = request.a + request.b

        future = rclpy.task.Future()
        def handler():
            self.get_logger().info('Timer fired')
            future.set_result(None)
        timer = self.create_timer(2, handler)
        await future
        timer.destroy()
        self.get_logger().info('Finished request!  a: %d b: %d' % (request.a, request.b))
        return response

def main(args=None):
    rclpy.init(args=args)
    minimal_service = MinimalService()
    rclpy.spin(minimal_service)
    rclpy.shutdown()

if __name__ == '__main__':
    main()
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 1, b: 2}"

Expected behavior

The service call should receive a response after a delay. While the executor is waiting for the timer, it should be possible for more services to be handled.

Actual behavior

The timer never fires and new incoming service requests are not logged.

Barry-Xu-2018 commented 3 years ago

In callback, the timer and future are all delt with in _wait_for_ready_callbacks of class Executor like service.
https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/executors.py#L564-L599

While handling an event (e.g. service), it will do
e.g.
https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/executors.py#L653-L659

for SingleThreadedExecutor https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/executors.py#L704-L710

for MultiThreadedExecutor https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/executors.py#L735 https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/executors.py#L743-L754

So multi-thread way only affects callback execution.
The core logic is in _wait_for_ready_callbacks.
Once you block callback function, other events (e.g. timer) cannot be dealt with.
This leads that the callback of timer is never called.

I think you can directly use threading.Timer and asyncio.Future instead.
Please correct me if I'm wrong

jtbandes commented 3 years ago

Once you block callback function, other events (e.g. timer) cannot be dealt with.

IMO, await is not “blocking” per se; it’s more like yielding control back to the executor.

I think you can directly use threading.Timer and asyncio.Future instead.

If you have some ideas, I’d be interested to see an example. My impression was that asyncio futures are not compatible with rclpy futures: https://github.com/ros2/rclpy/issues/279#issuecomment-473002645

Barry-Xu-2018 commented 3 years ago

IMO, await is not “blocking” per se; it’s more like yielding control back to the executor.

Yes.
My understanding is

https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/executors.py#L704-L710

handler is class Task (refer to task.py). This class executes a coroutine to completion. At line 239, run handler() defined in _make_handler() of class Executor https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/task.py#L220-L240

handler(). https://github.com/ros2/rclpy/blob/6f4e42cb4b409cbd79273dd9dd12cc35198ff138/rclpy/rclpy/executors.py#L404-L426

await call_coroutine()
   --> await await_or_execute()
        --> await callback()  <== This callback is `async def add_two_ints_callback`

So blocking occurs.

If you have some ideas, I’d be interested to see an example. My impression was that asyncio futures are not compatible with rclpy futures: #279 (comment)

I am not familiar with asyncio. So I use condition instead of asyncio. I update add_two_ints_callback() as below, and it works in my environment.

    async def add_two_ints_callback(self, request, response):
        self.get_logger().info(f'Executor: {str(self.executor)} ', )
        self.get_logger().info('Incoming request... a: %d b: %d' % (request.a, request.b))
        response.sum = request.a + request.b

        condition = threading.Condition()

        def handler():            
            self.get_logger().info('Timer fired')
            condition.acquire()
            condition.notify()
            condition.release()

        timer = threading.Timer(2, handler)
        timer.start()

        condition.acquire()
        condition.wait()
        condition.release()

        self.get_logger().info('Finished request!  a: %d b: %d' % (request.a, request.b))
        return response
jtbandes commented 3 years ago

Thanks, but that seems to defeat the purpose of my example because I am hoping for the event loop to be able to do other work while my service handler is waiting for the timer.

@sloretz - you gave a bunch of input on #279; do you have any ideas that could help with this issue?

Barry-Xu-2018 commented 3 years ago

but that seems to defeat the purpose of my example because I am hoping for the event loop to be able to do other work while my service handler is waiting for the timer.

Okay. From my point of view, I don't think we should wait long in a callback function for another task to complete. If possible, add an event to event queue in callback and another thread continue to deal with events in event queue.

asyncio forces everything being done in a single thread. Given the desire to let users have rclpy call callbacks in multiple threads, I don't think it can ever be made to fit.

According to sloretz's said in #279, current implementation limits the use of asyncio in callback.

jtbandes commented 3 years ago

I don't think we should wait long in a callback function for another task to complete. If possible, add an event to event queue in callback…

That's what I'm doing with create_timer().

…and another thread continue to deal with events in event queue.

This is where I disagree – as soon as my callback suspends at the await, it should be possible for the same thread to continue handling events. This is how other languages' single threaded event loops work and what users will expect when writing code with async/await.

As an example, this JavaScript code behaves the way I expect, although it runs in a single thread. As soon as the first function suspends execution at await, the second function can begin executing. The await frees up the runtime to process other functions from the event loop.

async function handler(n) {
  console.log("before", n);
  await new Promise((resolve) => setTimeout(resolve, 1000));
  console.log("after", n);
}

handler(1);
handler(2);

// Prints:
// before 1
// before 2
// after 1
// after 2

asyncio forces everything being done in a single thread. Given the desire to let users have rclpy call callbacks in multiple threads, I don't think it can ever be made to fit.

According to sloretz's said in #279, current implementation limits the use of asyncio in callback.

I don't see this as a reason why my "expected behavior" above is impossible... First, my bug report doesn't mention asyncio at all (I don't care if I need to use the asyncio library or not, I just want this await+timer example to work within rclpy). Second, just because some users might want rclpy to call callbacks in multiple threads, doesn't mean this should always be the norm. I would expect that code written in a non-blocking style (using async/await instead of thread-blocking primitives like time.sleep() or condition.wait()) should work the same way with a SingleThreadedExecutor as it does with a MultiThreadedExecutor. (Also note that MultiThreadedExecutor(2) still didn't give me the expected behavior.)

iuhilnehc-ynos commented 3 years ago

@jtbandes

Can you try using a temporary MutuallyExclusiveCallbackGroup(or ReentrantCallbackGroup ) instead of the default_callback_group while creating a Timer?

        from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
        timer = self.create_timer(2, handler, MutuallyExclusiveCallbackGroup())

and then use MultiThreadedExecutor(2).

Barry-Xu-2018 commented 3 years ago

@jtbandes Thank you for your detailed explanation.
I get what you want.
iuhilnehc-ynos' way is correct solution.

jtbandes commented 3 years ago

Thanks for the suggestions. I am guessing this solution fixes the problem with 1 async service call with a timer (like my example), but does not extend to allow more than 1 or 2 service calls to be executing simultaneously, if using a MultiThreadedExecutor(2). One of my requirements for a proper async solution is that an unlimited number of async service calls can be started at once (within memory and file descriptor limits, of course), regardless of the number of threads I use in my choice of executor.

sloretz commented 3 years ago

@jtbandes Thanks for the easy to reproduce example! @iuhilnehc-ynos is mostly correct, but there's no need for the MultiThreadedExecutor.

The trouble comes from the default callback group being a MutuallyExclusiveCallbackGroup. Here are the changes I made to make the example work.

diff --git a/rclpy_834.py b/rclpy_834.py
index 537825d..b102ace 100755
--- a/rclpy_834.py
+++ b/rclpy_834.py
@@ -1,12 +1,14 @@
 from example_interfaces.srv import AddTwoInts
 import rclpy
+from rclpy.callback_groups import ReentrantCallbackGroup
 from rclpy.node import Node

 class MinimalService(Node):
     def __init__(self):
         super().__init__('minimal_service')
-        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
+        self.cb_group = ReentrantCallbackGroup()
+        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback, callback_group=self.cb_group)

     async def add_two_ints_callback(self, request, response):
         self.get_logger().info(f'Executor: {str(self.executor)} ', )
@@ -17,7 +19,7 @@ class MinimalService(Node):
         def handler():
             self.get_logger().info('Timer fired')
             future.set_result(None)
-        timer = self.create_timer(2, handler)
+        timer = self.create_timer(2, handler, callback_group=self.cb_group)
         await future
         timer.destroy()
         self.get_logger().info('Finished request!  a: %d b: %d' % (request.a, request.b))

MutuallyExclusiveCallbackGroups tell the executor two or more callbacks should not be executed in parallel because they could conflict with each other. Callback groups have an advantage over locks in that thread doesn't get tied up by a callback blocking on acquiring a lock. In this case the executor sees that the timer callback is ready, but the timer's callback group tells the executor to wait because the service callback in the same group is has not finished executing. Changing their groups to a ReentrantCallbackGroup (or putting them into separate callback groups) allows the executor to schedule the timer callback while the service callback is awaiting.

Inspired by this year's ROS World executor group, I'm now of the opinion callback groups aren't needed in rclpy because Python coroutines can wait for resources without blocking a thread. That said, in the short term I would support making the default callback group a ReentrantCallbackGroup in order to make using coroutines easier. Would you be willing to open a PR?

jtbandes commented 3 years ago

Thanks a lot! The explanation makes sense — I will give this a try and see if it can work for us in rosbridge_server. I can open a PR to change the default callback group, although it seems like that might be a bit of a risky change?

On another note, I think it would be great for more documentation to include async examples and explanations of how to use futures. When I was poking around to see that using async functions was possible, I didn't really find any clear guidance on whether async/await is a well-supported thing in ROS 2, or just experimental. I learned from a comment that mixing with asyncio doesn't really work, and I had to figure out some of my own stuff around adding executor.wake() calls in the right places when I was working with other threads managed by twisted/tornado.

jtbandes commented 3 years ago

Looks like using ReentrantCallbackGroup works for me in https://github.com/RobotWebTools/rosbridge_suite/pull/666. Thanks!

jtbandes commented 3 years ago

Submitted a PR to change the default callback group: https://github.com/ros2/rclpy/pull/847