mavlink / MAVSDK-Python

MAVSDK client for Python.
https://mavsdk.mavlink.io
BSD 3-Clause "New" or "Revised" License
290 stars 216 forks source link

`aiogrpc` RuntimeError #698

Open ShafSpecs opened 1 month ago

ShafSpecs commented 1 month ago

Hello, so I was building a simple GCS. A simple UI on the front and a python server at the back.

Using websocket to handle commands and Server-Sent Events (SSE) for unidirectional information (telemetry). Whilst building out the telemetry aspect, I noticed that RuntimeError randomly got thrown.

Exception ignored in: <function WrappedIterator.__del__ at 0x00000202DCC768E0>
Traceback (most recent call last):
  File "C:\Users\user\app\.venv\Lib\site-packages\aiogrpc\utils.py", line 168, in __del__
    self._stream_executor.shutdown()
  File "C:\Python312\Lib\concurrent\futures\thread.py", line 238, in shutdown
    t.join()
  File "C:\Python312\Lib\threading.py", line 1144, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

Depending on how I structured the SSE route, it got thrown frequently or once in a while. A basic structure:

# sse route
    async def event_generator():
        while True:
            if await request.is_disconnected():
                print("Gracefully stopping stream...")
                break

            # uncommenting these lines instead gets the error thrown more frequently
            #
            # battery = ensure_future(get_battery())
            # gps = ensure_future(get_gps())
            # position = ensure_future(get_position())
            # heading = ensure_future(get_heading())
            # flight_mode = ensure_future(get_flight_mode())
            # fw_metrics = ensure_future(get_fw_metrics())
            # # is_in_air = ensure_future(get_is_in_air())
            # # landed_state = ensure_future(get_landed_state())
            # # mission_progress = ensure_future(get_mission_progress())
            # home = ensure_future(get_home())
            # health = ensure_future(get_health())
            # # status = ensure_future(get_status_text())
            # rc_status = ensure_future(get_rc_status())

            battery = await get_battery()
            gps = await get_gps()
            position = await get_position()
            heading = await get_heading()
            flight_mode = await get_flight_mode()
            fw_metrics = await get_fw_metrics()
            # is_in_air = ensure_future(get_is_in_air())
            # landed_state = ensure_future(get_landed_state())
            # mission_progress = ensure_future(get_mission_progress())
            home = await get_home()
            health = await get_health()
            # status = ensure_future(get_status_text())
            rc_status = await get_rc_status()

            yield encode({
                "event": "telemetry",
                "data": {
                    "battery": battery,
                    "gps": gps,
                    "position": position,
                    "heading": heading,
                    "flight_mode": flight_mode,
                    "fw_metrics": fw_metrics,
                    # "is_in_air": await is_in_air,
                    # "landed_state": await landed_state,
                    # "mission_progress": await mission_progress,
                    "home": home,
                    "health": health,
                    # "info": info,
                    # "status": await status,
                    "rc_status": rc_status,
                }
            }).decode("utf-8")

            await sleep(.5)

    res = EventSourceResponse(
        content=event_generator(),
        status_code=200,
        media_type="text/event-stream",
        headers={
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
            "Access-Control-Allow-Origin": "*",
            "Access-Control-Allow-Credentials": "true",
        }
    )
    return res

The error comes from mavsdk dependency: aiogrpc. I am not well-versed with multi-threading in python but I assume the issue stems from aiogrpc creating its own threads and attempting to join them. Is there any way I can disable that? Possibly use asyncio fully for running tasks on a separate thread? Or maybe find a fix I am overlooking. Thanks

ShafSpecs commented 1 month ago

Digging further, it seems the error is thrown (raised) internally as wrapping the whole code in a try...except block doesn't catch it.

julianoes commented 1 month ago

I have seen similar annoying exceptions when using it more recently.

Can you run mavsdk_server separate and check whether it doesn't crash or lockup?

julianoes commented 1 month ago

I ended up doing something like this, not sure if that would help your case:

# When mavsdk_server, which runs separately, crashes, we need to redo all
# mavsdk subscriptions. To do so, it's easiest just to restart this app
# entirely. We can detect that mavsdk_server has restarted by reading through
# the# aiogrpc.utils logs. Unfortunately, I didn't find a better way to catch
# this case.
class ErrorLogHandler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        if "grpc exception" in log_entry and "StatusCode.UNAVAILABLE" in log_entry:
            print("Triggering restart due to gRPC failure.")
            loop = asyncio.get_event_loop()
            loop.stop()

aio_logger = logging.getLogger("aiogrpc.utils")
aio_logger.addHandler(ErrorLogHandler())
ShafSpecs commented 1 month ago

I have seen similar annoying exceptions when using it more recently.

Can you run mavsdk_server separate and check whether it doesn't crash or lockup?

That's what I am doing. I am using windows which means I have to start the server process right before each connection:

async def start_mavsdk_server(connection_url: str, port: int):
    cmd = [resource_path('include/mavsdk_server.exe'), "-p", str(port), connection_url]

    mavsdk_server_process = Popen(
        cmd,
        stdout=PIPE,
        stderr=PIPE,
        universal_newlines=True,
        start_new_session=True,
    )

    return mavsdk_server_process

Still raises the exception

ShafSpecs commented 1 month ago

I ended up doing something like this, not sure if that would help your case:

# When mavsdk_server, which runs separately, crashes, we need to redo all
# mavsdk subscriptions. To do so, it's easiest just to restart this app
# entirely. We can detect that mavsdk_server has restarted by reading through
# the# aiogrpc.utils logs. Unfortunately, I didn't find a better way to catch
# this case.
class ErrorLogHandler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        if "grpc exception" in log_entry and "StatusCode.UNAVAILABLE" in log_entry:
            print("Triggering restart due to gRPC failure.")
            loop = asyncio.get_event_loop()
            loop.stop()

aio_logger = logging.getLogger("aiogrpc.utils")
aio_logger.addHandler(ErrorLogHandler())

Would try adding this to my event generator and seeing if it helps :+1:. Does this restart the entire app or just the mavsdk server? Due to the nature of sockets, restarting the entire app would pose an issue due to socket client behaviour.

Would I also need to restart mavsdk server manually after stopping the event loop?

julianoes commented 1 month ago

For me this restarts the full app which is a quart server in my case.

dp1140a commented 1 month ago

I get a similar error when running the examples. However it seems to be intermittent (The worst kind of error :) ) Takeoff and Land:

#!/usr/bin/env python3

import asyncio
from mavsdk import System

async def run():

    drone = System()
    await drone.connect(system_address="udp://:14540")

    status_text_task = asyncio.ensure_future(print_status_text(drone))

    print("Waiting for drone to connect...")
    async for state in drone.core.connection_state():
        if state.is_connected:
            print(f"-- Connected to drone!")
            break

    print("Waiting for drone to have a global position estimate...")
    async for health in drone.telemetry.health():
        if health.is_global_position_ok and health.is_home_position_ok:
            print("-- Global position estimate OK")
            break

    print("-- Arming")
    await drone.action.arm()

    print("-- Taking off")
    await drone.action.takeoff()

    await asyncio.sleep(10)

    print("-- Landing")
    await drone.action.land()

    #status_text_task.cancel()

async def print_status_text(drone):
    try:
        async for status_text in drone.telemetry.status_text():
            print(f"Status: {status_text.type}: {status_text.text}")
    except asyncio.CancelledError:
        return

if __name__ == "__main__":
    # Run the asyncio loop
    asyncio.run(run())

It runs seemingly ok. Drone takes off and lands. but then it throws the error:

python test1.py 
Waiting for drone to connect...
-- Connected to drone!
Waiting for drone to have a global position estimate...
-- Global position estimate OK
-- Arming
-- Taking off
Status: INFO: Armed by external command 
Status: INFO: Using default takeoff altitude: 10.0 m    
Status: INFO: [logger] ./log/2024-05-20/23_47_00.ulg    
Status: INFO: Using default takeoff altitude: 10.0 m    
Status: INFO: Takeoff detected  
-- Landing
Exception ignored in: <function WrappedIterator.__del__ at 0x710c78765750>
Traceback (most recent call last):
  File "/home/dave/.local/lib/python3.10/site-packages/aiogrpc/utils.py", line 168, in __del__
    self._stream_executor.shutdown()
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 235, in shutdown
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1093, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread
julianoes commented 1 month ago

@JonasVautherin any ideas?

ShafSpecs commented 1 month ago

Getting a new error, still stemming from aiogrpc:

__anext__ grpc exception
Traceback (most recent call last):
  File "C:\Users\user\project\.venv\Lib\site-packages\aiogrpc\utils.py", line 145, in cb
    fut.result()
  File "C:\Python312\Lib\concurrent\futures\thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\user\project\.venv\Lib\site-packages\aiogrpc\utils.py", line 131, in _next
    return next(self._iterator)
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\user\project\.venv\Lib\site-packages\grpc\_channel.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "C:\Users\user\project\.venv\Lib\site-packages\grpc\_channel.py", line 968, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNKNOWN
    details = "Unexpected error in RPC handling"
    debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-05-28T11:11:28.0840526+00:00", grpc_status:2, grpc_message:"Unexpected error in RPC handling"}"
>

This occurs more rarely, but it gives my systems a bit of unpredictability when trying to read telemetry consistently

Any ideas? cc @JonasVautherin

ShafSpecs commented 1 month ago

Overall, I am thinking if it's possible to dump aiogrpc (which hasn't been updated in 4 years) and roll with something like: grpclib which still receives updates and works natively with asyncio? I am not sure how tightly coupled this project is with aiogrpc. Would take a look at the feasibility of such conversion πŸ‘

ShafSpecs commented 1 month ago

Overall, I am thinking if it's possible to dump aiogrpc (which hasn't been updated in 4 years) and roll with something like: grpclib which still receives updates and works natively with asyncio? I am not sure how tightly coupled this project is with aiogrpc. Would take a look at the feasibility of such conversion πŸ‘

It doesn't look too complicated to facilitate a switch, but I do not know what I am doing yet. aiogrpc is used in just the async plugin manager. Not sure about the pbs files generation though, as grpclib does have a protoc plugin: https://github.com/vmagamedov/grpclib#protoc-plugin

julianoes commented 1 month ago

I didn't realize we could swap out aiogrpc. That sounds reasonable.

ShafSpecs commented 1 month ago

I didn't realize we could swap out aiogrpc. That sounds reasonable.

Is this something you intend on working on in the near future?

julianoes commented 1 month ago

Probably not in the next couple of weeks. Is this urgent for you, or are you asking to try yourself?

ShafSpecs commented 1 month ago

Probably not in the next couple of weeks. Is this urgent for you, or are you asking to try yourself?

Both. It's a bit urgent, and I am wondering if I can do it. Asides changing imports in the async plugin manager, I am not sure if I would need to re-generate any file or the current proto files generation are good to go

JonasVautherin commented 1 month ago

If it has a similar API to aiogrpc, maybe it's actually easy to swap :+1:. I would say just try it and see where it goes :blush:.

julianoes commented 1 month ago

I wouldn't know but all I find is this: https://github.com/search?q=repo%3Amavlink%2FMAVSDK-Python%20aiogrpc&type=code Does that make sense @JonasVautherin? I don't actually know all the pieces.

But at @ShafSpecs, I assume you're aware of how to do the generation as outlined here: https://github.com/mavlink/MAVSDK-Python?tab=readme-ov-file#generate-the-code

ShafSpecs commented 3 weeks ago

But at @ShafSpecs, I assume you're aware of how to do the generation as outlined here: https://github.com/mavlink/MAVSDK-Python?tab=readme-ov-file#generate-the-code

I am on windows, and it just hit me at the moment that I won't be able to run the necessary scripts. Maybe I could convert them to ps scripts? I don't know and the bash scripts seem to utilise native unix modules. I might just wait out the few weeks before you can implement it

julianoes commented 3 weeks ago

I would just do it in WSL2 or docker if I was you, but up to you.

ShafSpecs commented 2 weeks ago

Apparently, the official grpc package supports asyncio now so I didn't even need an external dependency or re-build protoc files. Ran a few examples locally and most work (though a few examples, like calibration, freezes indefinitely for some reason - seems like out of scope for this issue tho)! Is there a way I can move this local mavsdk into my original project to compare vs current release?

I also noticed that when I setup the package for local development, it correctly links to mavsdk_server, meaning I don't need to start it up myself. But in the actual package, it needs to be manually started on Windows. Do you know the reason why? I suppose I can also try and tackle that issue too

ShafSpecs commented 2 weeks ago

Ran a few examples locally and most work (though a few examples, like calibration, freezes indefinitely for some reason - seems like out of scope for this issue tho)!

Checking the git history, most of the grpc files were generated ~5 yrs ago (as well as outdated dependencies). I am trying to regenerate it on my end but running into an issue with protoc-gen-mavsdk. Could regenerating all the files possibly fix the issue? I mean a few things might have changed between now and then. Not sure though. The freezing is very worrying

JonasVautherin commented 2 weeks ago

Not sure what you mean. Most of these files are autogenerated. Some haven't changed in years, but that means that the API is stable :blush:.

Don't forget that MAVSDK-Python just relays the API to MAVSDK-C++. E.g. the API of the camera plugin hasn't changed much (that's API stability), but the implementation (in C++) has changed.

ShafSpecs commented 2 weeks ago

Not sure what you mean. Most of these files are autogenerated. Some haven't changed in years, but that means that the API is stable 😊.

Oh okay. There is a dependency conflict currently in the project that was making me wonder, in the dev requirements - caused by protoc-gen-mavsdk. The outdated dependencies made me wonder if perhaps it's not the mavlink APIs but rather python APIs that is causing most of the issues image

[!NOTE] I tried to vary versions, but protoc-gen-mavsdk depends on a very old version of the protobuf package

grpcio and grpcio-tools seem to have updated a lot of their APIs, python included (deprecation of the distutils package, for example in >=3.11). I am wondering wether those upgrades and deprecations is what is leading to random behaviours from the generated python files. I had to make some changes to even make setup.py and a few build files to make building work. I am not sure if this would fix things, but I am suggesting updating protoc-gen-mavsdk generally and re-running run_protoc.sh.

Edit: You worked on protoc-gen-mavsdk pb plugin quite a lot, Jonas πŸ˜„. Impressive! I have no idea what I am looking at. But the protobuf version seems to be limited on purpose

protobuf>=3.13,<=3.20.1
jinja2>=2.11
ShafSpecs commented 2 weeks ago

Attempted to fix the issues I noticed in the pb_plugin in this PR: https://github.com/mavlink/MAVSDK-Proto/pull/347. It should fix the issues with dependency conflict. Not sure if there are affecting breaking changes though, the tests all pass now though

ShafSpecs commented 2 weeks ago

I was able to regenerate all files as well as remove aiogrpc completely, in spite of all that, there are a few examples that still hang up indefinitely (e.g calibration)

In light of this, I have decided to open a PR (soon) in this repo to address the aiogrpc issue and leave the rest for another time. The freezing is a baffling issue that seems to have no reason, and it also means that some features of mavsdk unusable (e.g mission tracking, etc.). Thank you

ShafSpecs commented 2 weeks ago

@JonasVautherin you work at Auterion, right? Love the company btw, a favorite of mine.

How do you guys handle the MAVLink part of communication? Do you use the CPP version of MAVSDK straight up? Or mavlink? I want to work with mavsdk without having to pre-check working/non-working plugins. Which would you advise? Thanks for the help

julianoes commented 2 weeks ago

How do you guys handle the MAVLink part of communication? Do you use the CPP version of MAVSDK straight up?

What exactly do you mean? I mostly use MAVSDK C++ unless I specifically need Python. Use cases for that for me are quick scripting, or recently a quart webserver backend.

ShafSpecs commented 2 weeks ago

What exactly do you mean? I mostly use MAVSDK C++ unless I specifically need Python. Use cases for that for me are quick scripting, or recently a quart webserver backend.

Ah okay. I was taking a look at the python and the cpp of mavsdk. And the cpp seems more fleshed out. I was using electron and Python previously for my simple gcs but I am considering Qt instead thanks to the C++ backend - with mavsdk-cpp

Plus, it allows me to learn cpp. I'm much familiar with Rust than CPP

julianoes commented 1 week ago

Also see https://github.com/mavlink/MAVSDK-Rust/

ShafSpecs commented 1 week ago

Also see https://github.com/mavlink/MAVSDK-Rust/

I checked it out. Doesn't seem fully fleshed out like python or cpp, especially for a well rounded gcs.

JonasVautherin commented 1 week ago

@JonasVautherin you work at Auterion, right?

No, not anymore. I just try to help @julianoes with MAVSDK when I can, in my free time :blush:.

ShafSpecs commented 1 week ago

No, not anymore. I just try to help @julianoes with MAVSDK when I can, in my free time 😊.

Cool. Hopefully, I can learn enough to help consistently as well :rocket:.

ShafSpecs commented 1 week ago

Using my own local MAVSDK (with swapped out aiogrpc), I no longer get runtime errors, but whenever I attempt to use AsyncGenerators that tend to freeze the program:

# for example, invoking this just hangs the entire app
async def get_status_text(vehicle: System):
    async for status in vehicle.telemetry.status_text():
        return {
            "text": status.text,
            "type": status.type.__str__()
        }
# or
async def get_battery(vehicle: System):
    async for battery in vehicle.telemetry.battery():
        return {
            "voltage_v": battery.voltage_v,
            "current_battery_a": battery.current_battery_a,
            "remaining_percent": int(battery.remaining_percent),
            "capacity_consumed_ah": battery.capacity_consumed_ah,
            "temperature_degc": battery.temperature_degc,
            "id": battery.id
        }

I get a weird error from grpc.aio:

    |   File "C:\Users\user\project\routes\index.py", line 91, in event_generator
    |     "battery": await battery,
    |                ^^^^^^^^^^^^^
    |   File "C:\Users\user\project\uav\telemetry.py", line 101, in get_battery
    |     async for battery in vehicle.telemetry.battery():
    |   File "C:\Users\user\MAVSDK-Python\mavsdk\telemetry.py", line 4182, in battery
    |     async for response in battery_stream:
    |   File "C:\Users\user\project\.venv\Lib\site-packages\grpc\aio\_call.py", line 356, in _fetch_stream_responses
    |     await self._raise_for_status()
    |   File "C:\Users\user\project\.venv\Lib\site-packages\grpc\aio\_call.py", line 263, in _raise_for_status
    |     raise _create_rpc_error(
    | grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
    |   status = StatusCode.UNAVAILABLE
    |   details = "Connection reset"
    |   debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-06-24T11:58:48.0307323+00:00", grpc_status:14, grpc_message:"Connection reset"}"
    | >
    +------------------------------------

Does this indicate an error on the mavsdk server side? Hope not 🫣

JonasVautherin commented 1 week ago

It feels like the connection between the gRPC client and the gRPC server broke. Could it be that mavsdk_server crashed?

ShafSpecs commented 1 week ago

It feels like the connection between the gRPC client and the gRPC server broke. Could it be that mavsdk_server crashed?

I am assuming so. I would try to run mavsdk_server on its own so I can get logs from it and check the source of error πŸ‘