Closed JinraeKim closed 2 years ago
The problem is that using asyncio, you need to await
your coroutines in an event loop. Which you don't have here. So your coroutines are never actually run, and Python complains about it ("was never awaited").
I would guess you have two ways, assuming that ros2 does not provide an asyncio interface:
ThreadPoolExecutor
. I had written about it here a while ago.The problem is that using asyncio, you need to
await
your coroutines in an event loop. Which you don't have here. So your coroutines are never actually run, and Python complains about it ("was never awaited").I would guess you have two ways, assuming that ros2 does not provide an asyncio interface:
- Run your asyncio code (mavsdk) into your non-asyncio program (ros2). For that I would look into asyncio.run_coroutine_threadsafe, which will allow you to run the coroutines on your event loop from another thread (the thread of your non-asyncio code).
- Run your non-asyncio code (ros2) into your asyncio code (mavsdk). Here you build your program around mavsdk, and whenever you need to call ros functions, you do that in a
ThreadPoolExecutor
. I had written about it here a while ago.
I'll try it!
@JonasVautherin I was trying to integrate ROS2 publisher example and MAVSDK telemetry example. But it shows only the result from ROS2. Could you give me further advice?
My trial was:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from mavsdk import System
import asyncio
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
self.drone = System()
async def run(self):
await self.drone.connect(system_address="serial:///dev/ttyACM0")
asyncio.ensure_future(self.print_battery())
async def print_battery(self):
async for battery in self.drone.telemetry.battery():
print(f"Battery: {battery.remaining_percent}")
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(self.run(), loop)
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
So you try to run the run()
coroutine every 0.5s, do I understand your code correctly?
Did you check if run()
is ever called? Or were you just expecting to see some "Battery:" output?
So you try to run the
run()
coroutine every 0.5s, do I understand your code correctly?Did you check if
run()
is ever called? Or were you just expecting to see some "Battery:" output?
Sorry for the confusion. I have to clarify what I desire. The code seems so, but actually it would be better to run MAVSDK independent on the time period of ROS2 and publish some data through ROS2, where the data are obtained through MAVSDK and saved as a property of the ROS2 node class instance.
For the above goal, I first tried to display battery information like "Battery: " while running ROS2 node.
Do you think it's possible? If so, could you share Your idea?
Sorry I'm confused. Is run()
ever called, or not at all?
Sorry I'm confused. Is
run()
ever called, or not at all?
Sorry for confusion. I'll reply later as I attend a conference this week. Sorry.
@JonasVautherin Sorry for being late.
In the previous trial, it seems not to be called 'cause there is no message printed something like Battery: ...
.
If it's not what you expected, please let me know how to check whether self.run
has been called.
You can probably add something like print("entering run()")
inside the run()
routine, so that if it appears in the output, you know that is has been called.
No, it seems not.
code
async def run(self):
print("entering run()")
await self.drone.connect(system_address="serial:///dev/ttyACM0")
asyncio.ensure_future(self.print_battery())
async def print_battery(self):
async for battery in self.drone.telemetry.battery():
print(f"Battery: {battery.remaining_percent}")
result
➜ ~ python3 a.py
I guess the next step is to try to write a small python example that uses future = asyncio.run_coroutine_threadsafe(self.run(), loop)
and that works, and then to try to get it called from ROS :+1:
@JonasVautherin seems not to work (I mean, at least it didn't call run
)
from mavsdk import System
import asyncio
class MyClass:
def __init__(self):
self.drone = System()
async def run(self):
print("hi")
await self.drone.connect(system_address="serial:///dev/ttyACM0")
asyncio.ensure_future(self.print_battery())
def greet(self):
loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(self.run(), loop)
def main():
myclass = MyClass()
myclass.greet()
if __name__ == "__main__":
main()
➜ ~ python3 a.py
➜ ~
Another failure case:
from mavsdk import System
import asyncio
async def run(drone):
print("hi")
await drone.connect(system_address="serial:///dev/ttyACM0")
asyncio.ensure_future(drone.print_battery())
def greet(drone):
loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(run(drone), loop)
def main():
drone = System()
greet(drone)
if __name__ == "__main__":
main()
import asyncio
from threading import Semaphore, Thread
from time import sleep
loop_semaphore = Semaphore(0)
def run_asyncio_loop():
global loop
loop = asyncio.new_event_loop()
loop_semaphore.release()
loop.run_forever()
async def run_asyncio():
print("running asyncio")
def run_main():
global loop
print("running main")
print("waiting for asyncio loop creation")
loop_semaphore.acquire()
print("starting asyncio coroutine")
future = asyncio.run_coroutine_threadsafe(run_asyncio(), loop=loop)
print("waiting 2 seconds")
sleep(2)
print("end of run_main()")
if __name__ == '__main__':
Thread(target=run_asyncio_loop).start()
run_main()
import asyncio from threading import Semaphore, Thread from time import sleep loop_semaphore = Semaphore(0) def run_asyncio_loop(): global loop loop = asyncio.new_event_loop() loop_semaphore.release() loop.run_forever() async def run_asyncio(): print("running asyncio") def run_main(): global loop print("running main") print("waiting for asyncio loop creation") loop_semaphore.acquire() print("starting asyncio coroutine") future = asyncio.run_coroutine_threadsafe(run_asyncio(), loop=loop) print("waiting 2 seconds") sleep(2) print("end of run_main()") if __name__ == '__main__': Thread(target=run_asyncio_loop).start() run_main()
I need to take some time to digest this (especially the coroutines).
Thanks a lot, @JonasVautherin !!
Note: the result looks like
➜ ~ vim a.py
➜ ~ python3 a.py
running main
waiting for asyncio loop creation
starting asyncio coroutine
waiting 2 seconds
running asyncio
end of run_main()
@JonasVautherin Ok, as I understood from the documentation about thread-based parallelism in Python, are the followings correct?
Thread(target=run_asyncio_loop)
constructs a thread object called by .run()
.Semaphore(0)
creates a semaphore to indicate whether run_asyncio_loop
is running. loop_semaphore.release()
will incremente its counter, and loop_semaphore.acquire()
decrement the counter; the counter never be negative (that's why the printed texts are: "waiting for asyncio loop creation").Something confusing to me:
future
? Don't we need something like future.result()
?run_coroutine_threadsafe
receive run_asyncio
instead of run_asyncio()
?EDIT: If I change future = asyncio.run_coroutine_threadsafe(run_asyncio(), loop=loop)
to be future = asyncio.run_coroutine_threadsafe(run_asyncio, loop=loop)
, I saw the following error.
This is because the output of native coroutine (async def something
) is a coroutine.
➜ ~ python3 a.py
running main
waiting for asyncio loop creation
starting asyncio coroutine
Traceback (most recent call last):
File "a.py", line 33, in <module>
run_main()
File "a.py", line 25, in run_main
future = asyncio.run_coroutine_threadsafe(run_asyncio, loop=loop)
File "/usr/lib/python3.8/asyncio/tasks.py", line 917, in run_coroutine_threadsafe
raise TypeError('A coroutine object is required')
TypeError: A coroutine object is required
^CException ignored in: <module 'threading' from '/usr/lib/python3.8/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 1388, in _shutdown
lock.acquire()
KeyboardInterrupt:
For people who will try to integrate MAVSDK and ROS2:
I don't still figure out how to integrate them as a hierarchical structure, MAVSDK in ROS2 or ROS2 in MAVSDK. Instead, I found that read/write data as a JSON file works at least for simple problems. More precisely, I received drone data via MAVSDK and saved it to a JSON file. Then, I ran a ROS2 node, loaded the data, and published it.
EDIT: this approach may have potential issues. I'm trying to find an another way.
After studying asynchronous programming, I found a working example to integrate ROS2 publisher node and MAVSDK.
Thanks to my friend @sangmoon, who provided me a toy example of asyncio
.
One observed drawback (?) is that getting drone data via MAVSDK-Python seems too slow. I'm not sure this happens only for getting battery status. Anyway, here's my example to run ROS2 publisher node and MAVSDK asynchronously.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import asyncio
from mavsdk import System
async def run(node):
drone = System()
await drone.connect(system_address="serial:///dev/ttyACM0")
asyncio.ensure_future(print_battery(drone, node))
async def print_battery(drone, node):
async for battery in drone.telemetry.battery():
battery_stat = battery.remaining_percent
node.battery_percent = battery_stat # update
print(f"battery: {node.battery_percent}, i: {node.i}")
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
self.battery_percent = None
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
async def spin_once(minimal_publisher):
rclpy.spin_once(minimal_publisher)
async def run_ros2(minimal_publisher):
while True:
await spin_once(minimal_publisher)
await asyncio.sleep(0)
async def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
task_ros2 = asyncio.create_task(run_ros2(minimal_publisher))
task_mavsdk = asyncio.create_task(run(minimal_publisher))
await asyncio.gather(task_ros2, task_mavsdk)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
asyncio.run(main())
battery: -0.009999999776482582, i: 13
battery: -0.009999999776482582, i: 17
battery: -0.009999999776482582, i: 21
As shown in https://github.com/mavlink/MAVSDK-Python/issues/430#issuecomment-1008593103, the above way should not be the best solution.
Finally, I think I found an appropriate way to integrate MAVSDK-Python and rclpy (ROS2). Thanks to @sangmoon :) (The terminology might be wrong but the concept will make sense).
The main issue is from rclpy.
As shown here, you have to put a keyword argument timeout_sec=0
to avoid thread blocking.
Note that receiving battery status seems inherently slow. See https://github.com/mavlink/MAVSDK-Python/issues/419#issuecomment-1006623815.
So, the modified code looks like
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import asyncio from mavsdk import System
async def run(node): drone = System() await drone.connect(system_address="serial:///dev/ttyACM0") asyncio.ensure_future(print_attitude(drone, node))
async def print_attitude(drone, node): async for attitude in drone.telemetry.attitude_euler(): print(f"attitude: {attitude}")
class MinimalPublisher(Node): def init(self): super().init('minimalpublisher') self.publisher = self.create_publisher(String, 'topic', 10) timer_period = 0.5 # seconds self.timer= self.create_timer(timer_period, self.timer_callback) self.i = 0 self.battery_percent = None self._event_loop = asyncio.get_running_loop()
async def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
async def spin_once(minimal_publisher): rclpy.spin_once(minimal_publisher, timeout_sec=0)
async def run_ros2(minimal_publisher): while True: await spin_once(minimal_publisher) await asyncio.sleep(0.0)
async def main(args=None): rclpy.init(args=args) minimal_publisher = MinimalPublisher() task_ros2 = asyncio.create_task(run_ros2(minimal_publisher)) task_mavsdk = asyncio.create_task(run(minimal_publisher)) await asyncio.gather(task_ros2, task_mavsdk) minimal_publisher.destroy_node() rclpy.shutdown()
if name == 'main': asyncio.run(main())
and the output is
```python3
[INFO] [1641823598.415029942] [minimal_publisher]: Publishing: "Hello World: 5"
attitude: EulerAngle: [roll_deg: -2.652918577194214, pitch_deg: 5.962473392486572, yaw_deg: 82.83059692382812, timestamp_us: 3044210470000]
attitude: EulerAngle: [roll_deg: -2.652919054031372, pitch_deg: 5.962473392486572, yaw_deg: 82.83060455322266, timestamp_us: 3044210470000]
attitude: EulerAngle: [roll_deg: -2.653308391571045, pitch_deg: 5.963382720947266, yaw_deg: 82.82994842529297, timestamp_us: 3044210490000]
attitude: EulerAngle: [roll_deg: -2.653308629989624, pitch_deg: 5.963382720947266, yaw_deg: 82.82994842529297, timestamp_us: 3044210490000]
attitude: EulerAngle: [roll_deg: -2.6521449089050293, pitch_deg: 5.964702606201172, yaw_deg: 82.83106994628906, timestamp_us: 3044210515000]
attitude: EulerAngle: [roll_deg: -2.6521458625793457, pitch_deg: 5.964702606201172, yaw_deg: 82.83106994628906, timestamp_us: 3044210515000]
attitude: EulerAngle: [roll_deg: -2.65191388130188, pitch_deg: 5.965205192565918, yaw_deg: 82.83211517333984, timestamp_us: 3044210530000]
attitude: EulerAngle: [roll_deg: -2.65191388130188, pitch_deg: 5.965205192565918, yaw_deg: 82.83211517333984, timestamp_us: 3044210530000]
attitude: EulerAngle: [roll_deg: -2.6516993045806885, pitch_deg: 5.965878486633301, yaw_deg: 82.83292388916016, timestamp_us: 3044210555000]
attitude: EulerAngle: [roll_deg: -2.651700019836426, pitch_deg: 5.965878486633301, yaw_deg: 82.83293914794922, timestamp_us: 3044210555000]
attitude: EulerAngle: [roll_deg: -2.651632070541382, pitch_deg: 5.965549945831299, yaw_deg: 82.83326721191406, timestamp_us: 3044210570000]
attitude: EulerAngle: [roll_deg: -2.651632070541382, pitch_deg: 5.965549945831299, yaw_deg: 82.83326721191406, timestamp_us: 3044210570000]
attitude: EulerAngle: [roll_deg: -2.651362657546997, pitch_deg: 5.965458869934082, yaw_deg: 82.83262634277344, timestamp_us: 3044210595000]
attitude: EulerAngle: [roll_deg: -2.651362419128418, pitch_deg: 5.965458869934082, yaw_deg: 82.83262634277344, timestamp_us: 3044210595000]
attitude: EulerAngle: [roll_deg: -2.6509175300598145, pitch_deg: 5.965142250061035, yaw_deg: 82.83241271972656, timestamp_us: 3044210610000]
attitude: EulerAngle: [roll_deg: -2.6509177684783936, pitch_deg: 5.965142250061035, yaw_deg: 82.83241271972656, timestamp_us: 3044210610000]
attitude: EulerAngle: [roll_deg: -2.65085506439209, pitch_deg: 5.964704990386963, yaw_deg: 82.83186340332031, timestamp_us: 3044210630000]
attitude: EulerAngle: [roll_deg: -2.6507296562194824, pitch_deg: 5.96476936340332, yaw_deg: 82.8316650390625, timestamp_us: 3044210635000]
I think this is the more correct way to create a bridge between spin
and asyncio
. In comparision to the last comment, this will not busy-wait the middleware and handles cancellation properly. The guard condition is used to interrupt rmw_wait
and hence return early from spin_once
(to handle the case where nothing else is happening)
import threading
import asyncio
import rclpy
from rclpy.node import Node
async def spin(node: Node):
cancel = node.create_guard_condition(lambda: None)
def _spin(node: Node,
future: asyncio.Future,
event_loop: asyncio.AbstractEventLoop):
while not future.cancelled():
rclpy.spin_once(node)
if not future.cancelled():
event_loop.call_soon_threadsafe(future.set_result, None)
event_loop = asyncio.get_event_loop()
spin_task = event_loop.create_future()
spin_thread = threading.Thread(target=_spin, args=(node, spin_task, event_loop))
spin_thread.start()
try:
await spin_task
except asyncio.CancelledError:
cancel.trigger()
spin_thread.join()
node.destroy_guard_condition(cancel)
This code can then be used, for example, like this:
async def main():
# create a node without any work to do
node = Node('unused')
# create tasks for spinning and sleeping
spin_task = asyncio.get_event_loop().create_task(spin(node))
sleep_task = asyncio.get_event_loop().create_task(asyncio.sleep(5.0))
# concurrently execute both tasks
await asyncio.wait([spin_task, sleep_task], return_when=asyncio.FIRST_COMPLETED)
# cancel tasks
if spin_task.cancel():
await spin_task
if sleep_task.cancel():
await sleep_task
# run program
rclpy.init()
asyncio.get_event_loop().run_until_complete(main())
asyncio.get_event_loop().close()
rclpy.shutdown()
I am using Python 3.6, but I think the same principles apply to later versions of Python (some APIs are just a little bit less verbose)
Hello @allsey87 @JinraeKim, this is a very good example to explain, how to integrate MAVSDK and ROS2. As a newbie in MAVSDK i still have som questions. I use here ROS2 Humble and MAVSDK Python, both are installed in Raspberry Pi 4 and laptop. Laptop will receive the GPS Status of RPi4 and send RTCM data stream to RPi4. Here are two Ublox zed f9p used with ros-driver https://github.com/KumarRobotics/ublox/tree/ros2 (Holybro H-RTK F9P Helical, rover in RPi4 and base in laptop).
Hello @allsey87 @JinraeKim, this is a very good example to explain, how to integrate MAVSDK and ROS2. As a newbie in MAVSDK i still have som questions. I use here ROS2 Humble and MAVSDK Python, both are installed in Raspberry Pi 4 and laptop. Laptop will receive the GPS Status of RPi4 and send RTCM data stream to RPi4. Here are two Ublox zed f9p used with ros-driver https://github.com/KumarRobotics/ublox/tree/ros2 (Holybro H-RTK F9P Helical, rover in RPi4 and base in laptop).
- Instead of just simple publisher and subscriber, there are one rover node on RPi4 and one base node on laptop. Are there any ideas, how could i integrate ROS2 with MAVSDK here?
- Do i need to build a uXRCE-DDS bridge like the communication between drone FCU and CC or is it directly communicatable between CC and Laptop? Thanks a lot in advanced.
@Wenz922 Thanks for asking. Unfortunately, I kinda gave up mixing MAVSDK and ROS2 at the time. Also, I'm not studying this topic deeply now so I cannot give you any valuable feedback. Maybe others replied to this thread would have some insights, and you can ask them. Sorry for not being helpful for your problem.
I've read a related issue #247, but I cannot find out how to "run MAVSDK" in a ROS(2) node.
For example, I tried a very poor code as
which shows an error as