d5h-foss / grpc-interceptor

Simplified Python gRPC interceptors
MIT License
136 stars 21 forks source link

Client interceptor to handle exceptions doesn't enter `except` block #22

Closed CaueP closed 1 year ago

CaueP commented 1 year ago

Hi there!

I'm creating an interceptor to improve the exception logging on my gRPC client but the interceptor never enters the except block when a RpcError exception is raised. I added logs in the try block where method is called and in the except block and only the first log is run. Is it the expected behavior or am I doing something wrong?

The library versions I'm using:

Below is my interceptor implementation, following the example of client interceptors:

import logging

from grpc_interceptor import ClientInterceptor

logger = logging.getLogger(__name__)

class ErrorLogClientInterceptor(ClientInterceptor):
    """Interceptor to generate error logs on client errors"""

    def intercept(self, method, request, context):
        try:
            logger.info(
                f"[ErrorLogClientInterceptor] {context.method} will be executed")
            return method(request, context)
        except Exception as exc:
            logger.info(f"[ErrorLogClientInterceptor] Exception: {context.method} raised exception {exc}")
            self.log_error(exc)
            raise

    def log_error(self, exc: Exception) -> None:
        logger.exception(str(exc), exc_info=exc, stack_info=True)

My GrpcClient:

import logging
from contextlib import contextmanager
from typing import Any, Dict, Type

import grpc

from .interceptor import (
    ErrorLogClientInterceptor,
)

logger = logging.getLogger(__name__)

class GrpcClient:
    """Class to instantiate gRPC stubs properly.

    Here is an example of usage:
        with GrpcClient(MyAPIStub, target) as stub:
            stub.MyMethod(request)
            ...
    """

    _channels = {}  # type: Dict[str, Channel]
    """Holds all Singleton channels instantiations."""

    DEFAULT_OPTIONS = {
        # Send keepalive ping every 30 seconds, default is 2 hours.
        "grpc.keepalive_time_ms": 30000,
        # Keepalive ping time out after 5 seconds, default is 20 seconds.
        "grpc.keepalive_timeout_ms": 5000,
        # Allow keepalive pings when there's no gRPC calls.
        "grpc.keepalive_permit_without_calls": True,
        # Allow unlimited amount of keepalive pings without data.
        "grpc.http2.max_pings_without_data": 0,
        # Allow grpc pings from client every 30 seconds.
        "grpc.http2.min_time_between_pings_ms": 30000,
        # Allow grpc pings from client without data every 30 seconds.
        "grpc.http2.min_ping_interval_without_data_ms": 30000,
    }  # type: Dict[str, Any]

    @classmethod
    def _interceptors(cls):
        """Returns all interceptors to be added to the channel, in order."""
        return [
            ErrorLogClientInterceptor(),
        ]

    @classmethod
    @contextmanager
    def stub(cls, stub_class: Type, target: str, custom_options: Dict[str, Any] = None) -> Type:
        """Context manager that returns the desired stub."""
        # The target is used to refer to the singleton channel.
        # If the channel does not exist yet, create it and index it.
        if target not in cls._channels:
            options = cls.DEFAULT_OPTIONS.copy()
            if custom_options:
                options.update(custom_options)
            options = [(k, v) for k, v in options.items()]

            channel = grpc.insecure_channel(target, options=options)

            # Add channel interceptors.
            for interceptor in cls._interceptors():
                channel = grpc.intercept_channel(channel, interceptor)

            cls._channels[target] = channel

        stub = stub_class(cls._channels[target])
        yield stub

    @classmethod
    def _close_all_channels(cls):
        """Close all channels"""
        for channel in cls._channels.values():
            channel.close()

        cls._channels = {}

    def __del__(self):
        GrpcClient._close_all_channels()

A client call example:

from grpc_client.client import GrpcClient
from django.conf import settings

from proto.celery_task_api_pb2 import ExecuteTaskRequest
from proto.celery_task_api_pb2_grpc import CeleryTaskAPIStub

def call_execute_task_service(task_name: str, task_id: str) -> tuple:
    request = ExecuteTaskRequest(
        task_name=task_name,
        task_id=task_id,
    )

    with GrpcClient.stub(CeleryTaskAPIStub, settings.GRPC_SERVER_ENDPOINT) as stub:
        response, _ = stub.ExecuteTask.with_call(
            request,
            60,
        )

    result = response.output

    return result, response.raised_exception

My log output when my gRPC server was down to cause an UNAVAILABLE error on client. Note that only the log before the RPC being executed appears and then the exception is thrown:

dev [1]: from config.celery.execute_task_client_2 import call_execute_task_service

dev [2]: call_execute_task_service("sms.send", "FAKEID")
[ErrorLogClientInterceptor] /proto.celery.CeleryTaskAPI/ExecuteTask will be executed
---------------------------------------------------------------------------
_InactiveRpcError                         Traceback (most recent call last)
/opt/loggi/loggi/const.py in <module>
----> 1 call_execute_task_service("sms.send", "FAKEID")

/opt/loggi/loggi/config/celery/execute_task_client_2.py in call_execute_task_service(task_name, task_id)
     16         response, _ = stub.ExecuteTask.with_call(
     17             request,
---> 18             60,
     19         )
     20 

/usr/local/lib/python3.7/site-packages/grpc/_interceptor.py in with_call(self, request, timeout, metadata, credentials, wait_for_ready, compression)
    269                                credentials=credentials,
    270                                wait_for_ready=wait_for_ready,
--> 271                                compression=compression)
    272 
    273     def future(self,

/usr/local/lib/python3.7/site-packages/grpc/_interceptor.py in _with_call(self, request, timeout, metadata, credentials, wait_for_ready, compression)
    255                                                        client_call_details,
    256                                                        request)
--> 257         return call.result(), call
    258 
    259     def with_call(self,

/usr/local/lib/python3.7/site-packages/grpc/_channel.py in result(self, timeout)
    331     def result(self, timeout=None):  # pylint: disable=unused-argument
    332         """See grpc.Future.result."""
--> 333         raise self
    334 
    335     def exception(self, timeout=None):  # pylint: disable=unused-argument

/usr/local/lib/python3.7/site-packages/grpc/_interceptor.py in continuation(new_details, request)
    245                     credentials=new_credentials,
    246                     wait_for_ready=new_wait_for_ready,
--> 247                     compression=new_compression)
    248                 return _UnaryOutcome(response, call)
    249             except grpc.RpcError as rpc_error:

/usr/local/lib/python3.7/site-packages/grpc/_channel.py in with_call(self, request, timeout, metadata, credentials, wait_for_ready, compression)
    835         state, call, = self._blocking(request, timeout, metadata, credentials,
    836                                       wait_for_ready, compression)
--> 837         return _end_unary_response_blocking(state, call, True, None)
    838 
    839     def future(self,

/usr/local/lib/python3.7/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
    727             return state.response
    728     else:
--> 729         raise _InactiveRpcError(state)
    730 
    731 

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "Connection reset by peer"
    debug_error_string = "{"created":"@1666794956.925240886","description":"Error received from peer ipv4:172.17.0.1:50051","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Connection reset by peer","grpc_status":14}"
CaueP commented 1 year ago

While debugging the method call (return method(request, context)) in the try block, I noticed that the exception is being returned instead of raised, that's why the exception isn't being captured. I will check if this is the default behavior.

dan-hipschman commented 1 year ago

Hi @CaueP,

First off, thanks for the detailed report. The issue here is that method in the interceptor is a "continuation" as described in the client interceptor section of the gRPC docs. When you invoke the continuation, you actually get a future back, which resolves to either the result, or exception. I acknowledge this is confusing because it's a different API than the stub, but that's how the Python gRPC package is implemented.

Luckily it's not hard to do what you want. Check out this test as an example. I think the docs could be improved to describe the continuation better. I can update them when I have more time.

CaueP commented 1 year ago

Hey @dan-hipschman-od, thanks a lot for the explanation and the code example! I was able to solve my issue. 😃