ros2 / rclpy

rclpy (ROS Client Library for Python)
Apache License 2.0
308 stars 227 forks source link

Runtime error when a long async callback is triggered #962

Open m2-farzan opened 2 years ago

m2-farzan commented 2 years ago

Bug report

Required Info:

Steps to reproduce issue

import asyncio
import logging
import rclpy
from std_msgs.msg import String

logging.getLogger('asyncio').setLevel(logging.DEBUG)

async def cb(msg):
    print('Received a message:')
    await asyncio.sleep(0.1)
    print(msg.data)

async def main():
    rclpy.init()
    node = rclpy.create_node('my_node')
    node.create_subscription(String, '/test', cb, 10)
    while True:
        rclpy.spin_once(node, timeout_sec=0.001)
        await asyncio.sleep(0.001)

asyncio.run(main())

Run the subscriber and then run:

ros2 topic pub -1 /test std_msgs/msg/String "data: hello"

Expected behavior

When published to /test, the node should print Received a message:, wait for 0.1 seconds, and then print the message data.

From https://github.com/ros2/rclpy/pull/166#issue-285849213:

Any callback can now be a coroutine function (async def). If the callback is a coroutine and it becomes blocked the executor will move on and resume executing it later. The details of this are handled in the class Task which is a class very similar to asyncio.Task. asyncio can't be used directly because it is not thread safe.

Actual behavior

Executing <Task pending name='Task-1' coro=<main() running at my_listener.py:19> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f2ed81ddb80>()] created at /usr/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /usr/lib/python3.8/asyncio/base_events.py:184] created at /usr/lib/python3.8/asyncio/base_events.py:595> took 0.199 seconds
Received a message:
Traceback (most recent call last):
  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/__main__.py", line 45, in <module>
    cli.main()
  File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 444, in main
    run()
  File "/home/mostafa/.vscode/extensions/ms-python.python-2022.8.0/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 285, in run_file
    runpy.run_path(target_as_str, run_name=compat.force_str("__main__"))
  File "/usr/lib/python3.8/runpy.py", line 265, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/lib/python3.8/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "my_listener.py", line 21, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "my_listener.py", line 18, in main
    rclpy.spin_once(node, timeout_sec=0.001)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/__init__.py", line 176, in spin_once
    executor.spin_once(timeout_sec=timeout_sec)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 712, in spin_once
    raise handler.exception()
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/task.py", line 242, in __call__
    self._handler.send(None)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 418, in handler
    await call_coroutine(entity, arg)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 343, in _execute_subscription
    await await_or_execute(sub.callback, msg)
  File "/home/mostafa/dev/github/rclpy/issues/long-await/install/rclpy/lib/python3.8/site-packages/rclpy/executors.py", line 104, in await_or_execute
    return await callback(*args)
  File "my_listener.py", line 10, in cb
    await asyncio.sleep(0.1)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 659, in sleep
    return await future
RuntimeError: await wasn't used with future

Additional information

Observation

Reducing the timeout in the asyncio.sleep line makes the issue disappear. In the example on provided, on my system, a timeout of ~0.002 seconds or smaller works fine.

Guess

This piece might play a role:

https://github.com/ros2/rclpy/blob/c76a6a1d98869f37155798cdb4777d9e44965743/rclpy/rclpy/task.py#L234-L256

My theory is that self._handler.send(None) is non-blocking, causing the line self._executing = False to be executed before the async callback is finished (while self.done is still False). So this causes the function to be assumed as paused (kind of like a yielded generator), so the already-running coro object gets called again by this piece:

https://github.com/ros2/rclpy/blob/c76a6a1d98869f37155798cdb4777d9e44965743/rclpy/rclpy/executors.py#L470-L482

This causes some kind of clash, hence RuntimeError: await wasn't used with future.

So as a quick test, I commented out the self._executing = False line. This caused the second call to be avoided, (so no runtime error anymore), but the callback didn't finish (the message wasn't printed out).

Another minimal example

rclpy practically does something like this:

import asyncio

async def cb():
    print('a')
    await asyncio.sleep(1)
    print('b')

async def main():
    _handler = cb()
    while True:
        _handler.send(None)

asyncio.run(main())

Which also throws RuntimeError: await wasn't used with future.

sloretz commented 2 years ago

Thanks for the bug report! I'm learning a lot from it.

RuntimeError: await wasn't used with future

This line is an exception raised by asyncio. I think the core problem is asyncio and rclpy expect different behaviors from __await__, and confusingly that doesn't seem to be a bug.

In short rclpy calls __await__ multiple times because that's how it expects awaitables to work. When asyncio.sleep is used in a subscription callback it makes rclpy's async stuff try to operate on asyncios async stuff the same way. That cause an exception because asyncio expects awaitables to only be __await__ed once before they're done. PEP-492 leaves details like this up to individual async libraries, which seems to mean coroutines meant for one async library (like asyncio) aren't expected to work with another (like rclpy).


It seems Python intends async/await to be implemented differently by different async libraries. asyncio is the built-in library, but there are other ones: twisted, curio, trio, tornado, and of course rclpy's Task and Future.

The most helpful links I found were this open cPython pull request, and the stack overflow question that motivated it. PEP-492 says what an awaitable is, but doesn't specify how it behaves. rclpy and asyncio both use An object with an __await__ method returning an iterator as an awaitable, but there are no rules on what that iterator does or how it's supposed to be used.

rclpy's Future.__await__ is a generator, which is okay because a generator is an iterator. It yields None if the future is not done, else it returns the result. Task gets the result via StopIteration.value raised from using return in a generator. It seems every time rclpy tries to execute a task it will call send(None) again and create a new generator. This seems inefficient to me. We could probably improve performance by making Task store the result of the first send(None) call and then iterate on it.

https://github.com/ros2/rclpy/blob/c76a6a1d98869f37155798cdb4777d9e44965743/rclpy/rclpy/task.py#L54-L58

Looking at asyncio, it's version of __await__ expects send(None) to be called exactly once before the future is done, and exactly once after the future is done. In cPython 3.10 it accomplishes this by making the future set an attribute _asyncio_future_blocking on the Future to True if the future isn't done yet. asyncio.Task checks for that attribute, and if it's present it prevents itself from being scheduled again until the future is done.

I don't understand trio and curio that well, but they take a different approach. trio uses a different definition of an awaitable also in PEP 492: a generator based coroutine. curio, like rclpy, seems to call send(None) multiple times until a coroutine is done, and like trio it uses a generator based coroutine as it's awaitable. tornado replaced its async stuff with asyncio, but that's not really an option for rclpy because we allow executors to use multiple threads.


Maybe we could borrow ideas from other libraries? trio provides its own trio.sleep, and it errors if it detects another library is being used with it. twisted has asFuture and fromFuture methods for converting between it's stuff and asyncio. curio has a decorator @asyncioable that it uses to make some functions on it's primitives usable in asyncio.

Another idea not in any of the other libraries would be to make Task detect if it's executing an asyncio awaitable. If the return value from co.send(None) has an attribute _asyncio_future_blocking then it could store the future and check if it's done rather than calling send() again. That might cause problems because rclpy assumes coroutines become resumable coincident with something waking the wait set, and the asyncio event loop isn't going to do that. Maybe adding a done callback to the asyncio future that woke the executor would do it?

m2-farzan commented 2 years ago

curio has a decorator @asyncioable that it uses to make some functions on it's primitives usable in asyncio.

Interestingly, I've been using a decorator that takes an asyncio function and makes it compatible with rclpy. It goes like this:

def coro_guard(function):
    if not asyncio.iscoroutinefunction(function):
        raise Exception('Expected an async function')
    async def f(*args, **kargs):
        coro = function(*args, **kargs)
        try:
            while True:
                future = coro.send(None)
                while not future.done():
                    await asyncio.sleep(0)  # yields None
                future.result()  # raises exceptions if any
        except StopIteration as e:
            return e.value
    return f

This mimics rclpy awaitable behavior:

rclpy's Future.__await__ [...] yields None if the future is not done, else it returns the result.

I was using this as a temporary workaround until an upstream fix comes out, but maybe it could be the upstream fix?

That decorator assumes we're only dealing with asyncio awaitables. To handle both rclpy and asyncio awaiables in one, we could improve the decorator to check if future is None:

# Allows calling __await__ repeatedly on awaitables that require waiting for an future before doing so (e.g. asyncio).
# This will make asyncio functions compatible with rclpy implementation of async.
# See https://github.com/ros2/rclpy/issues/962 for more info
def coro_guard(function):
    if not asyncio.iscoroutinefunction(function):
        raise Exception('Expected an async function')
    async def f(*args, **kargs):
        coro = function(*args, **kargs)
        try:
            while True:
                future = coro.send(None)
                assert asyncio.isfuture(future) or future is None, "Unexpected awaitable behavior. Only use rclpy or asyncio awaitables."
                if future is None:  # coro is rclpy-style awaitable; await is expected to be called repeatedly
                    await asyncio.sleep(0)
                    continue
                while not future.done():  # coro is asyncio-style awaitable; stop calling await until future is done.
                    await asyncio.sleep(0)  # yields None
                future.result()
        except StopIteration as e:
            return e.value
    return f

Now rclpy.Task can automatically apply this transformation to all the coroutine functions and it turns them into the rclpy-style.

What are your thoughts? Should I update the PR to use this concept?

sloretz commented 2 years ago

The decorator is a cool idea. One problem could be if if asyncio's send() expectations could vary by cPython version. They have freedom to change it since it's an implementation detail.

There's another idea worth looking at. rclpy's problem with asyncio is pretty similar to asyncio's problem with concurrent.futures.Future. They solve it by providing asyncio.wrap_future which wraps a concurrent.futures.Future object with an asyncio one. It works by adding done calbacks to make one future complete the other one. It seems like this can be implemented with only public APIs.

Say we had an api rclpy.wrap_asyncio_future() which accepted an asyncio future and returned an rclpy one. It could use asyncio.isfuture to make sure the argument is an asyncio future. Then it could create an rclpy.Future to be used as the return value. The method would then get the asyncio event loop with future.get_loop(), and then use loop.call_soon_thread_safe to call asyncio.future.add_done_callback(). The done callback would complete the rclpy.Future when the asyncio one was completed.

We could integrate it into Task by checking the return value of the first send(). If the return value is an asyncio future, then the task could wrap it with an rclpy one and await it instead.

It's still important to note somewhere that one must use a single threaded executor, and spin_once has to be called repeatedly in an asyncio event loop in a non-blocking way. We could probably make it more convenient with an awaitable spin function, but that's probably a separate discussion.

charlielito commented 1 year ago

This works out of the box:

import asyncio
import logging
import rclpy
from std_msgs.msg import String

logging.getLogger("asyncio").setLevel(logging.DEBUG)

async def async_cb(msg):
    print("Received a message:")
    await asyncio.sleep(0.1)
    print(msg.data)

def cb(msg):
    asyncio.ensure_future(async_cb(msg))

async def main():
    rclpy.init()
    node = rclpy.create_node("my_node")
    node.create_subscription(String, "/test", cb, 10)
    while True:
        rclpy.spin_once(node, timeout_sec=0.001)
        await asyncio.sleep(0.001)

asyncio.run(main())
samiamlabs commented 1 year ago

Maybe this is the wrong place to ask, but are there any plans to add equivalents to some of the asyncio primitives to rclpy if there is only going to be limited support for asyncio?

I made an attempt at an asyncio.sleep replacement for my own use, but I suspect there could be issues using it in production code.

import time

import rclpy
import rclpy.task
import rclpy.executors

from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup

async def async_sleep(node: Node, 
                      seconds: float, 
                      callback_group=ReentrantCallbackGroup()):
    future = rclpy.task.Future()
    def handler():
        future.set_result(None)
    timer = node.create_timer(seconds, handler, callback_group=callback_group)
    await future

    timer.cancel()
    timer.destroy()

    return None

class AsyncSleepNode(Node):
    def __init__(self):
        super().__init__('timer_node')
        self.cb_group = ReentrantCallbackGroup()

        self.timer = self.create_timer(0.0, self.timer_callback, callback_group=self.cb_group)  # Create a timer with 1-second interval

    async def timer_callback(self):
        self.timer.cancel()
        self.timer.destroy()

        print(f"started at {time.strftime('%X')}")

        await async_sleep(self, 1.0)
        print('hello')

        await async_sleep(self, 1.0)
        print('world')

        print(f"finished at {time.strftime('%X')}")

def main(args=None):
    rclpy.init(args=args)
    node = AsyncSleepNode()

    executor = rclpy.executors.SingleThreadedExecutor()
    executor.add_node(node)
    executor.spin()

    rclpy.shutdown()

if __name__ == '__main__':
    main()
Achllle commented 1 year ago

@samiamlabs Using a ReentrantCallbackGroup with a SingleThreadedExecutor has no effect. Using a MultiThreadedExecutor doesn't play well with asyncio. Additionally, if you recreate the callback group each time (which happens because you don't supply it explicitly to the async_sleep function), it will not behave as intended.

Running your code, the timer only fires once (even if you change the timer to 1 second, instead of 0) and fix the above suggestions.

samiamlabs commented 1 year ago

Thank you for the feedback @Achllle :)

Using ReentrantCallbackGroup with SingleThreadedExecutor for coroutines is based on this comment by @sloretz https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 It is possible that I misunderstood something though.

The timer callback was only supposed to fire once in the example. I just wanted to trigger an async callback to run by the "rclpy event loop". I understand that naming it timer_callback is confusing and that you would expect it to fire multiple times though.

The comment about recreating the callback group every time is a good one! I wanted to limit the number of required input arguments to async_sleep but maybe creating temporary callback groups is too problematic. My idea was to supply a common callback group as an optional argument when doing something more intensive, like publishing at a rate.

Achllle commented 1 year ago

I was not aware that using a ReentrantCallbackGroup would work! Thanks for pointing that out. I also didn't notice the self.timer.cancel() part.

I don't think recreating the callback group here is an issue given you're trying to have it execute in parallel, but I believe if you recreate the group each time with a MutuallyExclusiveCallbackGroup as opposed to storing it as a class instance variable, you might get unexpected behaviors.

Though the PR to make ReentrantCallbackGroup didn't make it through, it would be good to have an asyncio + rclpy guide, as it seems like people are rolling their own quite frequently and there's no single agreed upon approach AFAIK. I took the liberty of modifying your example to show parallel execution:

class AsyncSleepNode(Node):
    def __init__(self):
        super().__init__('timer_node')
        self._default_callback_group = ReentrantCallbackGroup()

        self.timer1 = self.create_timer(2.0, self.timer_callback)
        self.timer2 = self.create_timer(2.0, self.timer_callback)

    async def async_sleep(self, seconds: float):
        future = rclpy.task.Future()
        def handler():
            future.set_result(None)
        timer = self.create_timer(seconds, handler)
        await future

        timer.cancel()
        timer.destroy()

        return None

    async def timer_callback(self):
        print(f"started at {time.strftime('%X')}")
        print('hello')
        await self.async_sleep(0.5)
        print('world')
        print(f"finished at {time.strftime('%X')}")

output:

started at 17:55:16
hello
started at 17:55:16
hello
world
finished at 17:55:16
world
finished at 17:55:16

@sloretz Curious if you agree with this approach or would suggest an alternative. I'd be happy to create a docs guide around this.

charlielito commented 1 year ago

Folks, how would you implement asyncio timeouts with a rclpy Future to be able to detect a timeouts if something never returns (like a service or action async call)

samiamlabs commented 1 year ago

Did you mean something like this @charlielito?

import rclpy
import rclpy.task
import rclpy.executors

from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup

import example_interfaces.srv

async def async_sleep(node: Node, seconds: float):
    future = rclpy.task.Future()
    def handler():
        future.set_result(None)
    timer = node.create_timer(seconds, handler)
    await future

    timer.cancel()
    timer.destroy()

    return None

async def future_with_timeout(node: Node, future: rclpy.task.Future, timeout_sec: float):
    first_done_future = rclpy.task.Future()

    def handler(arg=None):
        first_done_future.set_result(None) 
    timer = node.create_timer(timeout_sec, handler)
    future.add_done_callback(handler)

    await first_done_future

    if not future.done():
        raise TimeoutError(f"Future timed out after {timeout_sec} seconds")

    timer.cancel()
    timer.destroy()

    return future.result()

class AsyncFutureWithTimeoutNode(Node):
    def __init__(self):
        super().__init__('timer_node')
        self._default_callback_group = ReentrantCallbackGroup()

        self.add_two_inst_service = self.create_service(
            example_interfaces.srv.AddTwoInts,
            'test_service',
            self.test_service_callback
        )

        self.test_client = self.create_client(
            example_interfaces.srv.AddTwoInts,
            'test_service')

        self.timer = self.create_timer(0.0, self.timer_callback)  # Create a timer with 1-second interval

    async def test_service_callback(self, request, response):
        response.sum = request.a + request.b
        await async_sleep(self, 1.0) 
        return response

    async def timer_callback(self):
        self.timer.cancel()

        request = example_interfaces.srv.AddTwoInts.Request()
        request.a = 38
        request.b = 4

        try:
            future = self.test_client.call_async(request)
            response = await future_with_timeout(self, future, 0.5)
            print(f"AddTwoInts service call succeeded, the result is {response.sum}!")
        except TimeoutError as e:
            print(e)

        try:
            future = self.test_client.call_async(request)
            response = await future_with_timeout(self, future, 1.5)
            print(f"AddTwoInts service call succeeded, the result is {response.sum}!")
        except TimeoutError as e:
            print(e)

def main(args=None):
    rclpy.init(args=args)
    node = AsyncFutureWithTimeoutNode()

    executor = rclpy.executors.SingleThreadedExecutor()
    executor.add_node(node)
    executor.spin()

    rclpy.shutdown()

if __name__ == '__main__':
    main()

The output should be:

Future timed out after 0.5 seconds
AddTwoInts service call succeeded, the result is 42!
charlielito commented 1 year ago

That works! Thanks!

samiamlabs commented 7 months ago

I have been using an extended version of https://github.com/samiamlabs/ros2_vscode/blob/main/src/vscode_py/vscode_py/async_primitives.py in some of my work projects.

What I have so far is:

Are there any plans to add primitives like these to rclpy?

Do you know anything about this @sloretz?

tonygoldman commented 2 weeks ago

I have been using an extended version of https://github.com/samiamlabs/ros2_vscode/blob/main/src/vscode_py/vscode_py/async_primitives.py in some of my work projects.

What I have so far is:

  • future_with_timeout
  • wait_for_action_server
  • gather_with_timeout
  • async_sleep

Are there any plans to add primitives like these to rclpy?

Do you know anything about this @sloretz?

Cool work! We implemented similar utils for out project. When using async_sleep for short periods in a loop, we learned that creating and destroying timers rapidly causes a very high cpu load. FYI, future_with_timeout would cause problems if someone were to catch the TimeoutError, since you raise the exception before cancelling the timer.

I think more work should go into expanding async capabilities.

samiamlabs commented 2 weeks ago

FYI, future_with_timeout would cause problems if someone were to catch the TimeoutError, since you raise the exception before cancelling the timer.

Haha, I can't believe I missed that. Thanks for pointing it out. It should be easy to fix.

When using async_sleep for short periods in a loop, we learned that creating and destroying timers rapidly causes a very high cpu load.

I suspected it would not be very performant. I have only been using it to sleep for relatively long periods myself.

I think more work should go into expanding async capabilities.

That would be nice! Good to hear that more people than me see the utility of this. I brought it up in a meeting about executors at the last ROSCon in Odense. They suggested I post about it in some specific place that I promptly forgot about...

By the way. I'm concerned that something about this solution does not quite work. A colleague of mine found my code, stress-tested it, and concluded it did not work (without my knowledge). He didn't know about self._default_callback_group though, and possibly didn't set all callback groups properly.

It hasn't been locking up or anything like that in your testing?

tonygoldman commented 2 weeks ago

It hasn't been locking up or anything like that in your testing?

Callback groups in rclpy are really interesting. The documentation focuses on the use case for a multi threaded executor, but they're actually as important for running tasks in a single threaded executor.

By default, all callbacks are configured with the same mutually exclusive callback group. Thus, if any callback does not finish executing immediatly (by blocking in a multi threaded executor or awaiting in a single threaded), no other callback will get called. To solve that, you have three options:

The second option is useful for services, but I mostly use tasks these days.

Under the hood, the rclpy executor is implemented using async, and every callback is handled as a task. By calling spin(), we create a wait set and wait for ready callbacks, when entities in the wait set are ready, we handle them by creating tasks for them:

https://github.com/ros2/rclpy/blob/53d776009030bda44bdd3c36ef39ffabe1a0e14c/rclpy/rclpy/executors.py#L760 https://github.com/ros2/rclpy/blob/53d776009030bda44bdd3c36ef39ffabe1a0e14c/rclpy/rclpy/executors.py#L553-L557

Each task is yielded in a loop. When yielding a task for the first time, the callback group is entered https://github.com/ros2/rclpy/blob/53d776009030bda44bdd3c36ef39ffabe1a0e14c/rclpy/rclpy/executors.py#L525 The callback group is exited only when the callback returns https://github.com/ros2/rclpy/blob/53d776009030bda44bdd3c36ef39ffabe1a0e14c/rclpy/rclpy/executors.py#L539-L543

Awaiting in a callback requires a correct configuration of callback groups. In my opinion tasks are better suited for async.