Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Intercepting/Logging requests and Responses in GRPC

I'm developing a chat app using GRPC in which the server receives information from the client and sends it back out to all the clients connected to it. For this, I've used Saturnism's chat-example as a reference. I've replicated the code, the code compiles and runs but the server supposedly never receives any requests from client.

My question is:

  1. Is there a way to enable verbos server side and client side logging in GRPC to see what requests and responses are going in and out & what might be failing?
  2. I'm using the following code for server and client. What might be missing/wrong in the following code that's resulting in no communication between client and server.

WingokuServer.java

public class WingokuServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(8091)
                .intercept(recordRequestHeadersInterceptor())
                .addService(new WingokuServiceImpl())
                .build();

        System.out.println("Starting server...");
        server.start();
        System.out.println("Server started!");
        server.awaitTermination();
    }

WingokuServerSideServiceImplementation:

public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase {
    private static Set<StreamObserver<Response>> observers =
            Collections.newSetFromMap(new ConcurrentHashMap<>());

    public WingokuServiceImpl() {
        System.out.println("WingokuServiceImp");
    }

    @Override
    public StreamObserver<Request> messages(StreamObserver<Response> responseObserver) {
        System.out.println("messages");
        observers.add(responseObserver);
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request request) {
                System.out.println("Server onNext: ");
                System.out.println("request from client is: "+ request.getRequestMessage());
                Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
                for (StreamObserver<Response> observer : observers) {
                    observer.onNext(response);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Server onError: ");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                observers.remove(responseObserver);
                System.out.println("Server onCompleted ");
            }
        };
    }
}

WingokuClient:

public class WingokuClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
        WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
        StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>() {
            @Override
            public void onNext(Response response) {
                System.out.println("Client onNext");
                System.out.println("REsponse from server is: "+ response.getResponseMessage());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Client onError");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("Client OnComplete");
            }
        });

        requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
        requestStreamObserver.onCompleted();
        channel.shutdown();
        System.out.println("exiting client");
    }
}

Edit:

There is nothing wrong with the code. It works. I just needed to add awaitTermination to the client's channel because without it just closes the connection between client and server instantly, probably even before the requests go out of the client onto the network. That's why the server never received any requests.

However my question for enabling verbose logging and/or adding some sort of interceptor to the server side still remains unanswered. So I'm looking forward to getting some pointers from experts here.

like image 939
Umer Farooq Avatar asked Nov 07 '17 10:11

Umer Farooq


3 Answers

I found a way to log request and response in both server and client sides by using the Interceptor, it makes the code cleaner. It is also possible to use sleuth for the tracing as well.

Please use spring:

implementation 'io.github.lognet:grpc-spring-boot-starter'

Server part

You can then use the GRpcGlobalInterceptor annotation

import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@GRpcGlobalInterceptor
public class GrpcInterceptor implements ServerInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ServerCall.Listener<M> interceptCall(
            ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next) {
        String traceId = headers.get(TRACE_ID_KEY);
        // TODO: Add traceId to sleuth
        logger.warn("traceId from client: {}. TODO: Add traceId to sleuth", traceId);

        GrpcServerCall grpcServerCall = new GrpcServerCall(call);

        ServerCall.Listener listener = next.startCall(grpcServerCall, headers);

        return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener) {
            @Override
            public void onMessage(M message) {
                logger.info("Method: {}, Message: {}", methodName, message);
                super.onMessage(message);
            }
        };
    }

    private class GrpcServerCall<M, R> extends ServerCall<M, R> {

        ServerCall<M, R> serverCall;

        protected GrpcServerCall(ServerCall<M, R> serverCall) {
            this.serverCall = serverCall;
        }

        @Override
        public void request(int numMessages) {
            serverCall.request(numMessages);
        }

        @Override
        public void sendHeaders(Metadata headers) {
            serverCall.sendHeaders(headers);
        }

        @Override
        public void sendMessage(R message) {
            logger.info("Method: {}, Response: {}", serverCall.getMethodDescriptor().getFullMethodName(), message);
            serverCall.sendMessage(message);
        }

        @Override
        public void close(Status status, Metadata trailers) {
            serverCall.close(status, trailers);
        }

        @Override
        public boolean isCancelled() {
            return serverCall.isCancelled();
        }

        @Override
        public MethodDescriptor<M, R> getMethodDescriptor() {
            return serverCall.getMethodDescriptor();
        }
    }

    private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M> {

        String methodName;

        protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener) {
            super(listener);
            methodName = method.getFullMethodName();
        }
    }
}

Client part

Interceptor:

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@Component
public class BackendInterceptor implements ClientInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ClientCall<M, R> interceptCall(
            final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next) {
        return new BackendForwardingClientCall<M, R>(method,
                next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS))) {

            @Override
            public void sendMessage(M message) {
                logger.info("Method: {}, Message: {}", methodName, message);
                super.sendMessage(message);
            }

            @Override
            public void start(Listener<R> responseListener, Metadata headers) {
                // TODO: Use the sleuth traceId instead of 999
                headers.put(TRACE_ID_KEY, "999");

                BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
                super.start(backendListener, headers);
            }
        };
    }

    private class BackendListener<R> extends ClientCall.Listener<R> {

        String methodName;
        ClientCall.Listener<R> responseListener;

        protected BackendListener(String methodName, ClientCall.Listener<R> responseListener) {
            super();
            this.methodName = methodName;
            this.responseListener = responseListener;
        }

        @Override
        public void onMessage(R message) {
            logger.info("Method: {}, Response: {}", methodName, message);
            responseListener.onMessage(message);
        }

        @Override
        public void onHeaders(Metadata headers) {
            responseListener.onHeaders(headers);
        }

        @Override
        public void onClose(Status status, Metadata trailers) {
            responseListener.onClose(status, trailers);
        }

        @Override
        public void onReady() {
            responseListener.onReady();
        }
    }

    private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R> {

        String methodName;

        protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate) {
            super(delegate);
            methodName = method.getFullMethodName();
        }
    }
}

Add the interceptor to the channel:

ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();
like image 196
Shoohei Avatar answered Oct 12 '22 17:10

Shoohei


after many years let me also respond to this question (hoping to be useful for who will have the same problem). I basically solved by taking as example the response of Shoohei and trying to compress it as much as possible.

Server Interceptor

public class ServerLogInterceptor implements ServerInterceptor {

@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

    ServerCall<ReqT, RespT> listener = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {

        @Override
        public void sendMessage(RespT message) {
            log.debug("Sending message to cliens: {}",  message);
            super.sendMessage(message);
        }
    };

    return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(listener, headers)) {

        @Override
        public void onMessage(ReqT message) {
            log.debug("Received message from cliens: {}", message);
            super.onMessage(message);
        }

    };
}}

Client Interceptor

    public class ClientLogInterceptor  implements ClientInterceptor {

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
        MethodDescriptor<ReqT, RespT> method,
        CallOptions callOptions,
        Channel next
    ) {
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            @Override
            public void sendMessage(ReqT message) {
                log.debug("Sending message to modules: {}", message);
                super.sendMessage(message);
            }

            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {

                    @Override
                    public void onMessage(RespT message) {
                        log.debug("Received message from modules: {}", message);
                        super.onMessage(message);
                    }

                }, headers);
            }

        };
    }

}

(I'm not sure if I pasted correctly the code, in case just add or remove some parenthesis)

like image 20
Antonio Gianola Avatar answered Oct 12 '22 15:10

Antonio Gianola


You can turn on the frame logging in the Netty transport. First, create a file called logging.properties. In the file put the following contents:

handlers=java.util.logging.ConsoleHandler
io.grpc.netty.level=FINE
java.util.logging.ConsoleHandler.level=FINE
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter

Then start up the Java binary with the jvm flag -Djava.util.logging.config.file=logging.properties

like image 27
Kun Zhang Avatar answered Oct 12 '22 17:10

Kun Zhang