temporalio / sdk-python

Temporal Python SDK
MIT License
472 stars 75 forks source link

Question: Logging of uncaught exceptions when running under a threadpool executor #677

Closed sarnikowski closed 3 weeks ago

sarnikowski commented 4 weeks ago

When one normally wants to override the writing of uncaught exceptions in python, you can use the excepthook of sys. Below is an example of how one can do this in temporal:

import asyncio
import datetime as dt
import logging
import sys
import traceback

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

logging.basicConfig(
    level=logging.ERROR,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

logger = logging.getLogger(__name__)

def log_exception(exc_type, exc_value, exc_traceback):
    tb_lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
    tb_text = "".join(tb_lines)
    logger.error("Uncaught exception:\n%s", tb_text)

@activity.defn
async def say_hello(name: str) -> str:
    raise Exception("ohh no, I could not say hello")

@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            say_hello,
            name,
            start_to_close_timeout=dt.timedelta(seconds=10),
            retry_policy=RetryPolicy(
                initial_interval=dt.timedelta(seconds=0),
                maximum_attempts=1,
            ),
        )

async def start_temporal_server():
    return await WorkflowEnvironment.start_local(namespace="default", dev_server_log_level="error")

async def run_workflow(client: Client):
    async with Worker(
        client,
        task_queue="hello-task-queue",
        workflows=[GreetingWorkflow],
        activities=[say_hello],
    ):
        result = await client.execute_workflow(
            GreetingWorkflow.run,
            "World",
            id="greeting-workflow",
            task_queue="hello-task-queue",
        )
        print(f"Workflow result: {result}")

async def main():
    sys.excepthook = log_exception
    server = await start_temporal_server()
    await run_workflow(server.client)

if __name__ == "__main__":
    asyncio.run(main())

However, if the workers are running using for example a threadpool executor, this does not work because the hook of sys.excepthook is not propagated to these threads (ref: https://github.com/python/cpython/pull/13515).

I have an application where we do not want to print exceptions and their stacktraces to stderr, but rather log all uncaught exceptions. The problem is that if I want to do this for threads, the usual approach is to simply catch the exception and log the errors instead of raising an exception. However, if I want the temporal workflow to pick up the fact that an activity has failed, I believe I cannot do this ? So in essence I would like:

  1. To log all uncaught exceptions instead of printing these to stderr.
  2. Have temporal read the exceptions and stacktrace etc. so that it is readable on a workflow in the UI for example.
  3. Have activities and workflows behave as usual, with errors triggering retries etc.

Is there a way to achieve this ?

cretz commented 4 weeks ago

Yes, you should look into interceptors. For instance, this sample sends errors to Sentry. You can make an activity interceptor that logs and re-raises.

Also, feel free to join us on our forums or #python-sdk on Slack with general questions.

sarnikowski commented 3 weeks ago

For posterity, I ended up with the following solution:

I added a structlog interceptor, as suggested by @cretz:

from typing import Any, override

import structlog
from temporalio.worker import (
    ActivityInboundInterceptor,
    ExecuteActivityInput,
    ExecuteWorkflowInput,
    Interceptor,
    WorkflowInboundInterceptor,
    WorkflowInterceptorClassInput,
)

logger = structlog.get_logger()

class _StructlogActivityInboundInterceptor(ActivityInboundInterceptor):
    @override
    async def execute_activity(self, input: ExecuteActivityInput) -> Any:
        try:
            return await super().execute_activity(input)
        except Exception as e:
            logger.exception("uncaught exception in activity")
            raise e

class _StructlogWorkflowInboundInterceptor(WorkflowInboundInterceptor):
    @override
    async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
        try:
            return await super().execute_workflow(input)
        except Exception as e:
            logger.exception("uncaught exception in workflow")
            raise e

class StructlogInterceptor(Interceptor):
    """Temporal Interceptor class which will log workflow & activity exceptions to stdout."""

    @override
    def intercept_activity(self, next: ActivityInboundInterceptor) -> ActivityInboundInterceptor:
        return _StructlogActivityInboundInterceptor(super().intercept_activity(next))

    @override
    def workflow_interceptor_class(
        self, input: WorkflowInterceptorClassInput
    ) -> type[WorkflowInboundInterceptor] | None:
        return _StructlogWorkflowInboundInterceptor

However, this only ensured that uncaught exceptions would be written to stdout, using structlog. To suppress the exception being written to stderr, I ended up redirecting stderr to /dev/null, which I don't think is an ideal solution. To limit the scope of the redirect, I made a context manager, that I am using on my threadpool executor. The manager looks like this:

class RedirectStdErrToDevNull:
    """A context manager that pipes stderr output to /dev/null.

    Example:
        >>> import sys
        >>> with StdErrToDevNull():
        >>>     print("quack", file=sys.stderr)
        >>>     # No quacking will be written to stderr.
    """

    def __init__(self) -> None:
        self._devnull: TextIO | None = None
        self._stderr_redirector: contextlib.redirect_stderr[TextIO] | None = None

    def __enter__(self) -> "RedirectStdErrToDevNull":
        self._devnull = open(os.devnull, "w")
        self._stderr_redirector = contextlib.redirect_stderr(self._devnull)
        self._stderr_redirector.__enter__()
        return self

    def __exit__(
        self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
    ) -> None:
        if self._stderr_redirector is not None:
            self._stderr_redirector.__exit__(exc_type, exc_val, exc_tb)
        if self._devnull is not None:
            self._devnull.close()