Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to define a global error handler in gRPC python

Im trying to catch any exception that is raised in any servicer so I can make sure that I only propagate known exceptions and not unexpected ones like ValueError, TypeError etc.

I'd like to be able to catch any raised error, and format them or convert them to other errors to better control the info that is exposed.

I don't want to have to enclose every servicer method with try/except.

I've tried with an interceptor, but im not able to catch the errors there.

Is there a way of specifying an error handler for the grpc Server? like what you do with flask or any other http server?

like image 944
Sebastian Avatar asked Oct 02 '18 12:10

Sebastian


People also ask

How does exception handle in gRPC?

To that end, always use StreamObserver::OnError, which internally adds the status error to the trailing headers. The only exception, as we'll see below, is when we're working with streams. All client or server gRPC libraries support the official gRPC error model. Java encapsulates this error model with the class io.

What is global error handling?

The global error handler is used catch all errors and remove the need for duplicated error handling code throughout the . NET api. It's configured as middleware in the configure HTTP request pipeline section of the Program.

What is Python gRPC?

gRPC is an HTTP/2-based Remote Procedure Call (RPC) framework that uses protocol buffers ( protobuf ) as the underlying data serialization framework. It is an alternative to other language-neutral RPC frameworks such as Apache Thrift and Apache Arvo.


3 Answers

gRPC Python currently don't support server-side global error handler. The interceptor won't execute the server handler inside the intercept_service function, so there is no way to try/except.

Also, I found the gRPC Python server interceptor implementation is different from what they proposed original at L13-Python-Interceptors.md#server-interceptors. If the implementation stick to the original design, we can use interceptor as global error handler easily with handler and request/request_iterator.

# Current Implementation
intercept_service(self, continuation, handler_call_details)

# Original Design
intercept_unary_unary_handler(self, handler, method, request, servicer_context)
intercept_unary_stream_handler(self, handler, method, request, servicer_context)
intercept_stream_unary_handler(self, handler, method, request_iterator, servicer_context)
intercept_stream_stream_handler(self, handler, method, request_iterator, servicer_context)

Please submit a feature request issue to https://github.com/grpc/grpc/issues.

like image 66
Lidi Zheng Avatar answered Nov 05 '22 14:11

Lidi Zheng


Maybe this will help you :)

def _wrap_rpc_behavior(handler, fn):
    if handler is None:
        return None

    if handler.request_streaming and handler.response_streaming:
        behavior_fn = handler.stream_stream
        handler_factory = grpc.stream_stream_rpc_method_handler
    elif handler.request_streaming and not handler.response_streaming:
        behavior_fn = handler.stream_unary
        handler_factory = grpc.stream_unary_rpc_method_handler
    elif not handler.request_streaming and handler.response_streaming:
        behavior_fn = handler.unary_stream
        handler_factory = grpc.unary_stream_rpc_method_handler
    else:
        behavior_fn = handler.unary_unary
        handler_factory = grpc.unary_unary_rpc_method_handler

    return handler_factory(fn(behavior_fn,
                              handler.request_streaming,
                              handler.response_streaming),
                           request_deserializer=handler.request_deserializer,
                           response_serializer=handler.response_serializer)


class TracebackLoggerInterceptor(grpc.ServerInterceptor):

    def intercept_service(self, continuation, handler_call_details):
        def latency_wrapper(behavior, request_streaming, response_streaming):

            def new_behavior(request_or_iterator, servicer_context):
                try:
                    return behavior(request_or_iterator, servicer_context)
                except Exception as err:
                    logger.exception(err, exc_info=True)
            return new_behavior

        return _wrap_rpc_behavior(continuation(handler_call_details),    latency_wrapper)
like image 42
Snitkoff Avatar answered Nov 05 '22 16:11

Snitkoff


As some of the previous comments suggested, I tried the meta-class approach which works quite well.

Attached is a simple example to demonstrate how to intercept the grpc calls. You could extend this by providing the metaclass a list of decorators which you could apply on each function.

Also, it would be wise to be more selective regarding the methods you apply the wrapper to. A good option would be to list the methods of the autogenerated base class and only wrap those.

from types import FunctionType
from functools import wraps


def wrapper(method):
    @wraps(method)
    def wrapped(*args, **kwargs):
        # do stuff here
        return method(*args, **kwargs)

    return wrapped


class ServicerMiddlewareClass(type):
    def __new__(meta, classname, bases, class_dict):
        new_class_dict = {}

        for attribute_name, attribute in class_dict.items():
            if isinstance(attribute, FunctionType):
                # replace it with a wrapped version
                attribute = wrapper(attribute)

            new_class_dict[attribute_name] = attribute

        return type.__new__(meta, classname, bases, new_class_dict)


# In order to use
class MyGrpcService(grpc.MyGrpcServicer, metaclass=ServicerMiddlewareClass):
   ...
like image 22
odedfos Avatar answered Nov 05 '22 15:11

odedfos