mavlink / MAVSDK-Python

MAVSDK client for Python.
https://mavsdk.mavlink.io
BSD 3-Clause "New" or "Revised" License
326 stars 221 forks source link

RuntimeError: Task pending got Future attached to a different loop during Offboard control with FastAPI and MAVSDK #720

Open alireza787b opened 4 months ago

alireza787b commented 4 months ago

RuntimeError: Task pending got Future attached to a different loop during Offboard control with FastAPI and MAVSDK

Description

I am working on a Python application that integrates FastAPI for web-based control and MAVSDK for controlling a PX4 drone. The application handles video processing with OpenCV and allows for user input to control drone tracking and segmentation. However, I encountered a RuntimeError related to asynchronous programming and event loop management when trying to send velocity commands to the drone in offboard mode. But when starting the mavsdk offboard commands from the terminal (instead of the web app) wi...

Repo URL: https://github.com/alireza787b/PixEagle

Implementation

main.py:

import asyncio
import logging
import signal
import cv2
import threading
from uvicorn import Config, Server
from classes.app_controller import AppController
from classes.fastapi_handler import FastAPIHandler
from classes.parameters import Parameters

def start_fastapi_server(controller):
    logging.debug("Initializing FastAPI server...")
    fastapi_handler = FastAPIHandler(controller.video_handler, controller.telemetry_handler, controller)
    app = fastapi_handler.app

    config = Config(app=app, host=Parameters.HTTP_STREAM_HOST, port=Parameters.HTTP_STREAM_PORT, log_level="info")
    server = Server(config)

    server_thread = threading.Thread(target=server.run, daemon=True)
    server_thread.start()
    fastapi_handler.server = server
    logging.debug("FastAPI server started.")

    return server, server_thread

async def main():
    logging.basicConfig(level=logging.DEBUG)
    logging.debug("Starting main application...")

    controller = AppController()
    server, server_thread = start_fastapi_server(controller)

    def shutdown_handler(signum, frame):
        logging.info("Shutting down...")
        asyncio.create_task(controller.shutdown())
        server.should_exit = True
        controller.shutdown_flag = True

    signal.signal(signal.SIGINT, shutdown_handler)
    signal.signal(signal.SIGTERM, shutdown_handler)

    controller.shutdown_flag = False

    while not controller.shutdown_flag:
        frame = controller.video_handler.get_frame()
        if frame is None:
            break

        frame = await controller.update_loop(frame)
        controller.show_current_frame()

        key = cv2.waitKey(controller.video_handler.delay_frame) & 0xFF
        if key == ord('q'):
            logging.info("Quitting...")
            controller.shutdown_flag = True
        else:
            await controller.handle_key_input_async(key, frame)

    await controller.shutdown()
    server.should_exit = True
    server_thread.join()
    cv2.destroyAllWindows()
    logging.debug("Application shutdown complete.")

if __name__ == "__main__":
    asyncio.run(main())

px4_controller.py:

import asyncio
import math
import logging
from mavsdk import System
from classes.parameters import Parameters
from mavsdk.offboard import OffboardError, VelocityNedYaw, VelocityBodyYawspeed

# Configure logging
logger = logging.getLogger(__name__)

class PX4Controller:
    def __init__(self):
        if Parameters.EXTERNAL_MAVSDK_SERVER:
            self.drone = System(mavsdk_server_address='localhost', port=50051)
        else:
            self.drone = System()
        self.current_yaw = 0.0
        self.current_pitch = 0.0
        self.current_roll = 0.0
        self.current_altitude = 0.0
        self.camera_yaw_offset = Parameters.CAMERA_YAW_OFFSET
        self.update_task = None
        self.last_command = (0, 0, 0)
        self.active_mode = False

    async def connect(self):
        await self.drone.connect(system_address=Parameters.SYSTEM_ADDRESS)
        self.active_mode = True
        logger.info("Connected to the drone.")
        self.update_task = asyncio.create_task(self.update_drone_data())

    async def update_drone_data(self):
        while self.active_mode:
            try:
                async for position in self.drone.telemetry.position():
                    self.current_altitude = position.relative_altitude_m
                async for attitude in self.drone.telemetry.attitude_euler():
                    self.current_yaw = attitude.yaw + self.camera_yaw_offset
                    self.current_pitch = attitude.pitch
                    self.current_roll = attitude.roll
            except asyncio.CancelledError:
                logger.warning("Telemetry update task was cancelled.")
                break
            except Exception as e:
                logger.error(f"Error updating telemetry: {e}")
                await asyncio.sleep(1)

    def get_orientation(self):
        return self.current_yaw, self.current_pitch, self.current_roll

    async def send_body_velocity_commands(self, setpoint):
        vx, vy, vz = setpoint
        yaw_rate = 0
        try:
            logger.debug(f"Setting VELOCITY_BODY setpoint: Vx={vx}, Vy={vy}, Vz={vz}, Yaw rate={yaw_rate}")
            next_setpoint = VelocityBodyYawspeed(vx, vy, vz, yaw_rate)
            await self.drone.offboard.set_velocity_body(next_setpoint)
        except OffboardError as e:
            logger.error(f"Failed to send offboard velocity command: {e}")

    async def start_offboard_mode(self):
        result = {"steps": [], "errors": []}
        try:
            await self.drone.offboard.start()
            result["steps"].append("Offboard mode started.")
            logger.info("Offboard mode started.")
        except Exception as e:
            result["errors"].append(f"Failed to start offboard mode: {e}")
            logger.error(f"Failed to start offboard mode: {e}")
        return result

    async def stop_offboard_mode(self):
        logger.info("Stopping offboard mode...")
        await self.drone.offboard.stop()

    async def stop(self):
        if self.update_task:
            self.update_task.cancel()
            await self.update_task
        await self.stop_offboard_mode()
        self.active_mode = False
        logger.info("Disconnected from the drone.")

    async def send_initial_setpoint(self):
        await self.send_body_velocity_commands((0, 0, 0))

    def update_setpoint(self, setpoint):
        self.last_command = setpoint

Problem Encountered

When attempting to send velocity commands to the drone in offboard mode, I encountered the following RuntimeError:

RuntimeError: Task <Task pending name='Task-1' coro=<main() running at c:\Users\Alireza\source\repos\PixEagle\src\main.py:68> cb=[_run_until_complete_cb() at C:\Users\Alireza\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py:184]> got Future <Future pending cb=[wrap_future.<locals>._call_check_cancel() at C:\Users\Alireza\source\repos\PixEagle\.venv\lib\site-packages\aiogrpc\utils.py:52]> attached to a different loop
ERROR:classes.px4_controller:Error updating telemetry: cannot schedule new futures after shutdown

This error indicates that the coroutine is trying to await a Future that is attached to a different event loop. This might be caused by the threading used to run the FastAPI server or some issue with the integration between FastAPI and MAVSDK's asyncio operations.

Steps Taken and Debugging Ideas

  1. Unified Event Loop: Ensured that the main application and FastAPI server use the same event loop by starting the FastAPI server within the main event loop.
  2. Task Management: Checked for any inconsistencies in task management and event loop usage in both main.py and px4_controller.py.
  3. The follow command start/stop successfully from the app itself with key press activating. Suggesting that part is working. Only not working when running the same function (connect_px4) from the FastAPI endpoint.

Possible Problems

  1. Threading Conflicts: Running the FastAPI server in a separate thread might be causing conflicts with the asyncio event loop used by MAVSDK.
  2. Event Loop Handling: There might be issues with how the event loops are being managed, particularly with the telemetry update task in the PX4Controller.
  3. Shutdown Sequence: The shutdown sequence might be prematurely stopping the event loop, causing new futures to be attached to a closed loop.

Request for Assistance

I would appreciate any recommendations or solutions to handle this situation effectively. Specifically:

Thank you for your assistance.

julianoes commented 4 months ago

Unified Event Loop: Ensured that the main application and FastAPI server use the same event loop by starting the FastAPI server within the main event loop.

That would have been my guess as well. You made sure that's the case?

alireza787b commented 3 months ago

I tried that.. and push to latest commit you can check that out

It works... but now it became so slow and laggy ... the video is no longer real time over http ... not applicable... Do you think, is there a way we can have mavsdk and fastapi on different thread and run a command of mavsdk from fast api routes?

Unified Event Loop: Ensured that the main application and FastAPI server use the same event loop by starting the FastAPI server within the main event loop.

That would have been my guess as well. You made sure that's the case?