ros2 / launch

Tools for launching multiple processes and for writing tests involving multiple processes.
Apache License 2.0
124 stars 139 forks source link

Feature Request: LaunchService wrapper for easy non-blocking launch and shutdown #724

Open KKSTB opened 1 year ago

KKSTB commented 1 year ago

Feature request

Feature description

LaunchService has puzzled many people as to how to properly launch a LaunchDescription without blocking the main thread: https://github.com/ros2/launch/pull/210 https://answers.ros.org/question/321118/ros2-nodes-occasionally-dying-using-launchservice-in-a-subprocess/ https://github.com/ros2/launch/issues/126

So I propose either adding a start() function to LaunchService, or a new wrapper class, that spawns a daemon process to run the async launch loop, like this:

import asyncio
import multiprocessing

from launch import LaunchDescription, LaunchService

class Ros2LaunchParent:
    def start(self, launch_description: LaunchDescription):
        self._stop_event = multiprocessing.Event()
        self._process = multiprocessing.Process(target=self._run_process, args=(self._stop_event, launch_description), daemon=True)
        self._process.start()

    def shutdown(self):
        self._stop_event.set()
        self._process.join()

    def _run_process(self, stop_event, launch_description):
        loop = asyncio.get_event_loop()
        launch_service = LaunchService()
        launch_service.include_launch_description(launch_description)
        launch_task = loop.create_task(launch_service.run_async())
        loop.run_until_complete(loop.run_in_executor(None, stop_event.wait))
        if not launch_task.done():
            asyncio.ensure_future(launch_service.shutdown(), loop=loop)
            loop.run_until_complete(launch_task)

Implementation considerations

Besides launching a LaunchDescription, it would be better if there is another mode of launching an individual node and get its PID to operate the process, just like ROS1 does.

Rezenders commented 9 months ago

HI @KKSTB, I am trying to replicate what you described in this issue, but for me this code is still blocking. Can you help me out?

This is what I have so far:

    def start_node_process(self, launch_description: LaunchDescription):
        self._stop_event = multiprocessing.Event()
        self._process = multiprocessing.Process(
            target=self._run_process,
            args=(self._stop_event, launch_description),
            daemon=True
        )
        self._process.start()

    def _run_process(self, stop_event, launch_description):
        loop = asyncio.get_event_loop()
        launch_service = LaunchService()
        launch_service.include_launch_description(launch_description)
        launch_task = loop.create_task(launch_service.run_async())
        loop.run_until_complete(loop.run_in_executor(None, stop_event.wait))
        if not launch_task.done():
            asyncio.ensure_future(launch_service.shutdown(), loop=loop)
            loop.run_until_complete(launch_task)

    def start_ros_node(self, node_dict):
        node = launch_ros.actions.Node(**node_dict)
        self.start_node_process(LaunchDescription([node]))

However, when I call start_ros_node it still blocks and never returns

Thanks in advance!

KKSTB commented 9 months ago

Hi @Rezenders. I used it in a large project to launch many other python launch files without problem. Maybe you can try the followings to see what's wrong:

  1. Just like what I do, use launch description that includes a python launch file that launches something e.g. a talker node
  2. Just like what you do, use launch description to launch a node, but launches a talker node
KKSTB commented 9 months ago

I did encounter some problem when my project grows to use several event loops. So a slight change to the code in OP is needed:

From:

loop = asyncio.get_event_loop()

To:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Ryanf55 commented 6 months ago

I would have expected something like this to work, but it doesn't:

from launch import LaunchService, LaunchDescription
from launch.actions import IncludeLaunchDescription
from ament_index_python.packages import get_package_share_directory
from launch.launch_description_sources import AnyLaunchDescriptionSource
import pathlib
from multiprocessing import Process
import time

def get_launch_file(package_name, launch_file_name):
    path = pathlib.Path(get_package_share_directory(package_name), "launch", launch_file_name)
    assert path.exists()
    return path

def app_srv_via_launch_xml(app):
    """Provides a launch services for an application in the foreground.

    Assumes it has an XML launch file.

    Args:
        app (str): The application name, ie "demo_pkg".
    """
    launch_file = get_launch_file(app, f"{app}.launch.xml")

    ld = LaunchDescription(
        [
            IncludeLaunchDescription(AnyLaunchDescriptionSource([str(launch_file)])),
        ]
    )

    service = LaunchService()
    service.include_launch_description(ld)
    return service

def main():
    app_name = "demo_pkg"
    service = app_srv_via_launch_xml(app_name)
    demo_pkg_process = Process(target=service.run, daemon=True)
    demo_pkg_process.start()
    time.sleep(10)
    demo_pkg_process.terminate()
    print("Done")

if __name__=="__main__":
    main()

The process exits cleanly, but the demo_pkg node is till running.

KKSTB commented 6 months ago

The process exits cleanly, but the demo_pkg node is till running.

This is because using SIGTERM can result in orphaned processes (https://github.com/ros2/launch/blob/d9ffd805e3d9ca42fe4dd0019ae095e9eb0d4d72/launch/launch/launch_service.py#L209C53-L209C85).

To shutdown children processes, LaunchService.shutdown() should be called. Another method is to send SIGINT.

And since there is no way to send SIGINT to the LaunchService in your example, this run method probably cannot shutdown children processes once started.