agronholm / anyio

High level asynchronous concurrency and networking framework that works on top of either trio or asyncio
MIT License
1.79k stars 137 forks source link

Retrieve the stack trace from a worker process of to_process.run_sync() when an exception is raised #587

Open gwerbin opened 1 year ago

gwerbin commented 1 year ago

Things to check first

AnyIO version

3.7.1

Python version

3.10.11

What happened?

When an exception is raised using to_process.run_sync, I expected to be able to access or view the original exception traceback somehow. Debugging is somewhat difficult without this feature.

This is supported in stdlib multiprocessing in a roundabout and hacky but effective way:

when the exception is unpickled in the main process it gets a secondary exception chained to it using __cause__ ... whose stringification contains the stringification of the original traceback.

How can we reproduce the bug?

import asyncio
import time

import anyio.to_process

def oops():
    raise RuntimeError("oops...")

def another_func():
    oops()

async def main():
    await anyio.to_process.run_sync(another_func)

if __name__ == '__main__':
    asyncio.run(main())

I realize now that this might be as much a feature request as it is a bug. Please feel free to re-label as needed.

gwerbin commented 1 year ago

If it's any help, here's something I threw together that seems to work in my current project:

import traceback
from collections.abc import Callable
from types import TracebackType
from typing import ParamSpec, TypeVar

from anyio import to_process

Ex = TypeVar("Ex", bound=BaseException)
P = ParamSpec("P")
R = TypeVar("R")

class RemoteTraceback(BaseException):
    tb_str: str

    def __init__(self, tb_str: str) -> None:
        self.tb_str = tb_str

    def __str__(self) -> str:
        return f"\n\n{self.tb_str}"

def _rebuild_exc(exc: Ex, tb_str: str) -> Ex:
    exc.__cause__ = RemoteTraceback(tb_str)
    return exc

class ExceptionWithTraceback(BaseException):
    exc: BaseException
    tb_str: str

    def __init__(self, exc: BaseException, tb: TracebackType | None) -> None:
        tb_fmt = traceback.format_exception(type(exc), exc, tb)
        self.exc = exc
        self.tb_str = "".join(tb_fmt)

    def __reduce__(self) -> tuple[Callable[[BaseException, str], BaseException], tuple[BaseException, str]]:
        return _rebuild_exc, (self.exc, self.tb_str)

def _traceback_wrapper(f: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R:
    print(f)
    try:
        return f(*args, **kwargs)
    except Exception as exc:
        raise ExceptionWithTraceback(exc, exc.__traceback__)

# Without the "valid-type" ignore, Mypy complains that `**kwargs: P.kwargs` is missing
# from function signatures that use ParamSpec.
# We can't use `**kwargs` here because Anyio doesn't support it.

async def run_in_process(f: Callable[P, R], *args: P.args) -> R:  # type:ignore[valid-type]
    return await to_process.run_sync(_traceback_wrapper, f, *args)
monchin commented 3 months ago

How is it going now? It would be a really helpful feature such if I use fastapi in an async funtion to run a cpu-indensive task but failed, with this feature I can get the reason.

richardsheridan commented 2 months ago

FWIW this is how I implemented it: https://github.com/richardsheridan/trio-parallel/blob/7b136a80a342518d5d1b62d64447bff6f130fadb/_trio_parallel_workers/__init__.py#L19-L39

Whether to use tblib and accept another dependency or vendor the classes from Dask like gwerbin suggested is up to you I suppose!