Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read Meta data in gRPC using Java at client side

I am using Java and Protoc 3.0 compiler and my proto file is mention below. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang

syntax = "proto3";

package Telemetry;

// Interface exported by Agent
service OpenConfigTelemetry {
    // Request an inline subscription for data at the specified path.
    // The device should send telemetry data back on the same
    // connection as the subscription request.
    rpc telemetrySubscribe(SubscriptionRequest)                     returns (stream OpenConfigData) {}

    // Terminates and removes an exisiting telemetry subscription
    rpc cancelTelemetrySubscription(CancelSubscriptionRequest)      returns (CancelSubscriptionReply) {}

    // Get the list of current telemetry subscriptions from the
    // target. This command returns a list of existing subscriptions
    // not including those that are established via configuration.
    rpc getTelemetrySubscriptions(GetSubscriptionsRequest)          returns (GetSubscriptionsReply) {}

    // Get Telemetry Agent Operational States
    rpc getTelemetryOperationalState(GetOperationalStateRequest)    returns (GetOperationalStateReply) {}

    // Return the set of data encodings supported by the device for
    // telemetry data
    rpc getDataEncodings(DataEncodingRequest)                       returns (DataEncodingReply) {}
}

// Message sent for a telemetry subscription request
message SubscriptionRequest {
    // Data associated with a telemetry subscription
    SubscriptionInput input                                 = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;

    // The below configuration is not defined in Openconfig RPC.
    // It is a proposed extension to configure additional
    // subscription request features.
    SubscriptionAdditionalConfig additional_config          = 3;
}

// Data associated with a telemetry subscription
message SubscriptionInput {
    // List of optional collector endpoints to send data for
    // this subscription.
    // If no collector destinations are specified, the collector
    // destination is assumed to be the requester on the rpc channel.
    repeated Collector  collector_list                      = 1;
}

// Collector endpoints to send data specified as an ip+port combination.
message Collector {
    // IP address of collector endpoint
    string address                                          = 1;

    // Transport protocol port number for the collector destination.
    uint32 port                                             = 2;
}

// Data model path
message Path {
    // Data model path of interest
    // Path specification for elements of OpenConfig data models
    string path                                             = 1;

    // Regular expression to be used in filtering state leaves
    string filter                                           = 2;

    // If this is set to true, the target device will only send
    // updates to the collector upon a change in data value
    bool suppress_unchanged                                 = 3;

    // Maximum time in ms the target device may go without sending
    // a message to the collector. If this time expires with
    // suppress-unchanged set, the target device must send an update
    // message regardless if the data values have changed.
    uint32 max_silent_interval                              = 4;

    // Time in ms between collection and transmission of the
    // specified data to the collector platform. The target device
    // will sample the corresponding data (e.g,. a counter) and
    // immediately send to the collector destination.
    //
    // If sample-frequency is set to 0, then the network device
    // must emit an update upon every datum change.
    uint32 sample_frequency                                 = 5;
}

// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
    // limit the number of records sent in the stream
    int32 limit_records                                     = 1;

    // limit the time the stream remains open
    int32 limit_time_seconds                                = 2;
}

// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
//    subscription request.

// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
    // Response message to a telemetry subscription creation or
    // get request.
    SubscriptionResponse response                           = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;
}

// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
    // Unique id for the subscription on the device. This is
    // generated by the device and returned in a subscription
    // request or when listing existing subscriptions
    uint32 subscription_id = 1;
}

// 2. Telemetry data send back on the same connection as the
//    subscription request.
message OpenConfigData {
    // router name:export IP address
    string system_id                                        = 1;

    // line card / RE (slot number)
    uint32 component_id                                     = 2;

    // PFE (if applicable)
    uint32 sub_component_id                                 = 3;

    // Path specification for elements of OpenConfig data models
    string path                                             = 4;

    // Sequence number, monotonically increasing for each
    // system_id, component_id, sub_component_id + path.
    uint64 sequence_number                                  = 5;

    // timestamp (milliseconds since epoch)
    uint64 timestamp                                        = 6;

    // List of key-value pairs
    repeated KeyValue kv                                    = 7;
}

// Simple Key-value, where value could be one of scalar types
message KeyValue {
    // Key
    string key                                              =  1;

    // One of possible values
    oneof value {
        double double_value                                 =  5;
        int64  int_value                                    =  6;
        uint64 uint_value                                   =  7;
        sint64 sint_value                                   =  8;
        bool   bool_value                                   =  9;
        string str_value                                    = 10;
        bytes  bytes_value                                  = 11;
    }
}

// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
    // Return code
    ReturnCode code                                         = 1;

    // Return code string
    string     code_str                                     = 2;
};

// Result of the operation
enum ReturnCode {
    SUCCESS                                                 = 0;
    NO_SUBSCRIPTION_ENTRY                                   = 1;
    UNKNOWN_ERROR                                           = 2;
}

// Message sent for a telemetry get request
message GetSubscriptionsRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription get request
message GetSubscriptionsReply {
    // List of current telemetry subscriptions
    repeated SubscriptionReply subscription_list            = 1;
}

// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
    // Per-subscription_id level operational state can be requested.
    //
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers including agent-level
    // operational stats
    // --- or ---
    // If subscription_id is not present then sent only agent-level
    // operational stats
    uint32 subscription_id                                  = 1;

    // Control verbosity of the output
    VerbosityLevel verbosity                                = 2;
}

// Verbosity Level
enum VerbosityLevel {
    DETAIL                                                  = 0;
    TERSE                                                   = 1;
    BRIEF                                                   = 2;
}

// Reply to telemetry agent operational states request
message GetOperationalStateReply {
    // List of key-value pairs where
    //     key      = operational state definition
    //     value    = operational state value
    repeated KeyValue kv                                    = 1;
}

// Message sent for a data encoding request
message DataEncodingRequest {
}

// Reply to data encodings supported request
message DataEncodingReply {
    repeated EncodingType  encoding_list                    = 1;
}

// Encoding Type Supported
enum EncodingType {
    UNDEFINED                                               = 0;
    XML                                                     = 1;
    JSON_IETF                                               = 2;
    PROTO3                                                  = 3;
}

In order to do the service call (rpc TelemetrySubscribe) first i need to read header which have subscription id and then start reading messages. Now, using Java i am able to connect with the service, i did introduce the interceptor but when i print/retrieve header it is null. My code of calling interceptor is below,

 ClientInterceptor interceptor = new HeaderClientInterceptor();
      originChannel = OkHttpChannelBuilder.forAddress(host, port)
        .usePlaintext(true)
        .build();
     Channel channel =  ClientInterceptors.intercept(originChannel, interceptor);
      telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

This is interceptor code to read meta Data.

  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
      CallOptions callOptions, Channel next) {
    return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

      @Override
      public void start(Listener<RespT> responseListener, Metadata headers) {

        super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
          @Override
          public void onHeaders(Metadata headers) {

             Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);

            System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));

Wondering is there any other way to read meta data or first message which have subscription ID in it? All i need to read first message which have subscription Id, and return the same subscription id to server so that streaming can start I have equivalent Python code using same proto file and it is communicating with server by code mention below for reference only:

     sub_req = SubscribeRequestMsg("host",port)
     data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
     metadata = data_itr.initial_metadata()

                   if metadata[0][0] == "responseKey":
                    metainfo = metadata[0][1]
                    print metainfo

                    subreply = agent_pb2.SubscriptionReply()
                    subreply.SetInParent()
                    google.protobuf.text_format.Merge(metainfo, subreply)

                    if subreply.response.subscription_id:
                    SUB_ID = subreply.response.subscription_id

From the python code above i can easily retrieve meta data object, not sure how to retrieve same using Java?

After reading metaData all i am getting is: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})

But i know there is one more line from meta data to it, which is

response {
  subscription_id: 2
}

How can i extract last response from Header which have subscription id in it. I did try many options and i am lost here.

like image 842
Ammad Avatar asked Apr 18 '17 18:04

Ammad


2 Answers

The method you used is for request metadata, not response metadata:

public void start(Listener<RespT> responseListener, Metadata headers) {

For response metadata, you will need a ClientCall.Listener and wait for the onHeaders callback:

public void onHeaders(Metadata headers)

I do feel like the usage of metadata you mention seems strange. Metadata is generally for additional error details or cross-cutting features that aren't specific to the RPC method (like auth, tracing, etc.).

like image 120
Eric Anderson Avatar answered Sep 22 '22 04:09

Eric Anderson


Often times using the ClientInterceptor is inconvenient because you need to maintain a reference to it in order to pull the data back out. In your case, the data is actually Metadata. One way you can get access to the Metadata easier is by putting it inside of the Context.

For example, you could create a Context.Key for the subscription id. In your client interceptor, you could extract the Metadata header that you want, and put it inside the Context, using Context.current().withValue(key, metadata). Inside your StreamObserver, you can extract this This by calling key.get(Context.current()). This assumes you are using the Async API, rather than the blocking API.

The reason it is more difficult is because usually metadata is information about a call, but not directly related to the call itself. It is for things like tracing, encoding, stats, cancellation and things like that. If something changes the way you handle the request, it probably needs to go directly into the request itself, rather than being on the side.

like image 25
Carl Mastrangelo Avatar answered Sep 25 '22 04:09

Carl Mastrangelo