Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to intercept the headers from in call to one service and insert it to another request in gRPC-java?

Tags:

grpc

grpc-java

I have two servers - HelloServer and WorldServer.

Both implement the same proto file:

// The greeting service definition.
service GreeterService {
    // Sends a greeting
    rpc GreetWithHelloOrWorld (GreeterRequest) returns (GreeterReply) {}
    rpc GreetWithHelloWorld (GreeterRequest) returns (GreeterReply) {}
}

message GreeterRequest {
    string id = 1;
}

// The response message containing the greetings
message GreeterReply {
    string message = 1;
    string id = 2;
}

I want to add traceIds to the requests. As far as I understand, this is achieved through adding the traceId in the Metadata object.

Here is the test I am using to check that the traceIds are passed along. The request is made to the HelloServer which in turns calls the WorldServer and then finally returns the response.

@Test
public void greetHelloWorld() {
    String traceId = UUID.randomUUID().toString();
    Metadata metadata = new Metadata();
    metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);

    Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(traceId).build();

    ManagedChannel channel = ManagedChannelBuilder
        .forAddress("localhost", 8080)
        .usePlaintext(true)
        .build();

    AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
    AtomicReference<Metadata> headersCapture = new AtomicReference<>();
    ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
    ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);

    GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
    GreeterServiceStub asyncStub = GreeterServiceGrpc.newStub(channel);

    try {
        Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloWorld(greeterRequest);
        String idInResponse = greeterReply.getId();
        String idInHeaders = headersCapture.get().get(MetadataKeys.TRACE_ID_METADATA_KEY);
        logger.info("Response from HelloService and WorldService  -- , id = {}, headers = {}", greeterReply.getMessage(), idInResponse, idInHeaders);
        assertEquals("Ids in response and header did not match", idInResponse, idInHeaders);
    } catch (StatusRuntimeException e) {
        logger.warn("Exception when calling HelloService and WorldService\n" +  e);
        fail();
    } finally {
        channel.shutdown();
    }
}

Implementation of ServerInterceptor:

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

    String traceId = headers.get(MetadataKeys.TRACE_ID_METADATA_KEY);
    logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 1=" + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
    return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
        @Override
        public void sendHeaders(Metadata headers) {
            headers.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
            logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 2  " + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
            super.sendHeaders(headers);
        }

        @Override
        public void sendMessage(RespT message) {
           logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " message=" + message.toString());
            super.sendMessage(message);
        }
    }, headers);

Here is the implementation of the greetWithHelloWorld() method:

public void greetWithHelloWorld(com.comcast.manitoba.world.hello.Greeter.GreeterRequest request,
                                        io.grpc.stub.StreamObserver<com.comcast.manitoba.world.hello.Greeter.GreeterReply> responseObserver) {
        Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();
    
        Metadata metadata = new Metadata();
        metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, "");
    
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081)
                .usePlaintext(true).build();
    
        AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
        AtomicReference<Metadata> headersCapture = new AtomicReference<>();
        ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
        ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);
    
        GreeterServiceGrpc.GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
    
        String messageFromWorldService = "";
        String replyIdFromWorldService = "";
        try {
            Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloOrWorld(greeterRequest);
            messageFromWorldService = greeterReply.getMessage();
            replyIdFromWorldService = greeterReply.getId();
            logger.info("Response from WorldService  -- {}, id = {}", messageFromWorldService, replyIdFromWorldService);
        } catch (StatusRuntimeException e) {
            logger.warn("Exception when calling HelloService\n" +  e);
        }
    
        Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService).setId(replyIdFromWorldService).build();
        responseObserver.onNext(greeterReply);
        responseObserver.onCompleted();
    }

The problem is in the greetWithHelloWorld() method, I don't have access to Metadata, so I cannot extract the traceId from the header and attach it to the request to the World server. However, if I put a breakpoint in that method, I can see that request object does have the traceId in it which is private to it and unaccessible.

Any ideas how can I achieve this? Also, is this the best way to do pass traceIds around? I found some references to using Context. What is the difference between Context and Metadata?

like image 815
user2237511 Avatar asked Jan 04 '23 06:01

user2237511


1 Answers

The expected approach is to use a ClientInterceptor and ServerInterceptor. The client interceptor would copy from Context into Metadata. The server interceptor would copy from Metadata to Context. Use Contexts.interceptCall in the server interceptor to apply the Context all callbacks.

Metadata is for wire-level propagation. Context is for in-process propagation. Generally the application should not need to interact directly with Metadata (in Java).

like image 184
Eric Anderson Avatar answered Apr 27 '23 17:04

Eric Anderson