I am using Java and Protoc 3.0 compiler and my proto file is mention below. https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
syntax = "proto3";
package Telemetry;
// Interface exported by Agent
service OpenConfigTelemetry {
// Request an inline subscription for data at the specified path.
// The device should send telemetry data back on the same
// connection as the subscription request.
rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}
// Terminates and removes an exisiting telemetry subscription
rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}
// Get the list of current telemetry subscriptions from the
// target. This command returns a list of existing subscriptions
// not including those that are established via configuration.
rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}
// Get Telemetry Agent Operational States
rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}
// Return the set of data encodings supported by the device for
// telemetry data
rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
}
// Message sent for a telemetry subscription request
message SubscriptionRequest {
// Data associated with a telemetry subscription
SubscriptionInput input = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
// The below configuration is not defined in Openconfig RPC.
// It is a proposed extension to configure additional
// subscription request features.
SubscriptionAdditionalConfig additional_config = 3;
}
// Data associated with a telemetry subscription
message SubscriptionInput {
// List of optional collector endpoints to send data for
// this subscription.
// If no collector destinations are specified, the collector
// destination is assumed to be the requester on the rpc channel.
repeated Collector collector_list = 1;
}
// Collector endpoints to send data specified as an ip+port combination.
message Collector {
// IP address of collector endpoint
string address = 1;
// Transport protocol port number for the collector destination.
uint32 port = 2;
}
// Data model path
message Path {
// Data model path of interest
// Path specification for elements of OpenConfig data models
string path = 1;
// Regular expression to be used in filtering state leaves
string filter = 2;
// If this is set to true, the target device will only send
// updates to the collector upon a change in data value
bool suppress_unchanged = 3;
// Maximum time in ms the target device may go without sending
// a message to the collector. If this time expires with
// suppress-unchanged set, the target device must send an update
// message regardless if the data values have changed.
uint32 max_silent_interval = 4;
// Time in ms between collection and transmission of the
// specified data to the collector platform. The target device
// will sample the corresponding data (e.g,. a counter) and
// immediately send to the collector destination.
//
// If sample-frequency is set to 0, then the network device
// must emit an update upon every datum change.
uint32 sample_frequency = 5;
}
// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
// limit the number of records sent in the stream
int32 limit_records = 1;
// limit the time the stream remains open
int32 limit_time_seconds = 2;
}
// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
// subscription request.
// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
// Response message to a telemetry subscription creation or
// get request.
SubscriptionResponse response = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
}
// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
// Unique id for the subscription on the device. This is
// generated by the device and returned in a subscription
// request or when listing existing subscriptions
uint32 subscription_id = 1;
}
// 2. Telemetry data send back on the same connection as the
// subscription request.
message OpenConfigData {
// router name:export IP address
string system_id = 1;
// line card / RE (slot number)
uint32 component_id = 2;
// PFE (if applicable)
uint32 sub_component_id = 3;
// Path specification for elements of OpenConfig data models
string path = 4;
// Sequence number, monotonically increasing for each
// system_id, component_id, sub_component_id + path.
uint64 sequence_number = 5;
// timestamp (milliseconds since epoch)
uint64 timestamp = 6;
// List of key-value pairs
repeated KeyValue kv = 7;
}
// Simple Key-value, where value could be one of scalar types
message KeyValue {
// Key
string key = 1;
// One of possible values
oneof value {
double double_value = 5;
int64 int_value = 6;
uint64 uint_value = 7;
sint64 sint_value = 8;
bool bool_value = 9;
string str_value = 10;
bytes bytes_value = 11;
}
}
// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
// Subscription identifier as returned by the device when
// subscription was requested
uint32 subscription_id = 1;
}
// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
// Return code
ReturnCode code = 1;
// Return code string
string code_str = 2;
};
// Result of the operation
enum ReturnCode {
SUCCESS = 0;
NO_SUBSCRIPTION_ENTRY = 1;
UNKNOWN_ERROR = 2;
}
// Message sent for a telemetry get request
message GetSubscriptionsRequest {
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers
uint32 subscription_id = 1;
}
// Reply to telemetry subscription get request
message GetSubscriptionsReply {
// List of current telemetry subscriptions
repeated SubscriptionReply subscription_list = 1;
}
// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
// Per-subscription_id level operational state can be requested.
//
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers including agent-level
// operational stats
// --- or ---
// If subscription_id is not present then sent only agent-level
// operational stats
uint32 subscription_id = 1;
// Control verbosity of the output
VerbosityLevel verbosity = 2;
}
// Verbosity Level
enum VerbosityLevel {
DETAIL = 0;
TERSE = 1;
BRIEF = 2;
}
// Reply to telemetry agent operational states request
message GetOperationalStateReply {
// List of key-value pairs where
// key = operational state definition
// value = operational state value
repeated KeyValue kv = 1;
}
// Message sent for a data encoding request
message DataEncodingRequest {
}
// Reply to data encodings supported request
message DataEncodingReply {
repeated EncodingType encoding_list = 1;
}
// Encoding Type Supported
enum EncodingType {
UNDEFINED = 0;
XML = 1;
JSON_IETF = 2;
PROTO3 = 3;
}
In order to do the service call (rpc TelemetrySubscribe) first i need to read header which have subscription id and then start reading messages. Now, using Java i am able to connect with the service, i did introduce the interceptor but when i print/retrieve header it is null. My code of calling interceptor is below,
ClientInterceptor interceptor = new HeaderClientInterceptor();
originChannel = OkHttpChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);
This is interceptor code to read meta Data.
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);
System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));
Wondering is there any other way to read meta data or first message which have subscription ID in it? All i need to read first message which have subscription Id, and return the same subscription id to server so that streaming can start I have equivalent Python code using same proto file and it is communicating with server by code mention below for reference only:
sub_req = SubscribeRequestMsg("host",port)
data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
metadata = data_itr.initial_metadata()
if metadata[0][0] == "responseKey":
metainfo = metadata[0][1]
print metainfo
subreply = agent_pb2.SubscriptionReply()
subreply.SetInParent()
google.protobuf.text_format.Merge(metainfo, subreply)
if subreply.response.subscription_id:
SUB_ID = subreply.response.subscription_id
From the python code above i can easily retrieve meta data object, not sure how to retrieve same using Java?
After reading metaData all i am getting is: Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})
But i know there is one more line from meta data to it, which is
response {
subscription_id: 2
}
How can i extract last response from Header which have subscription id in it. I did try many options and i am lost here.
The method you used is for request metadata, not response metadata:
public void start(Listener<RespT> responseListener, Metadata headers) {
For response metadata, you will need a ClientCall.Listener
and wait for the onHeaders callback:
public void onHeaders(Metadata headers)
I do feel like the usage of metadata you mention seems strange. Metadata is generally for additional error details or cross-cutting features that aren't specific to the RPC method (like auth, tracing, etc.).
Often times using the ClientInterceptor is inconvenient because you need to maintain a reference to it in order to pull the data back out. In your case, the data is actually Metadata. One way you can get access to the Metadata easier is by putting it inside of the Context
.
For example, you could create a Context.Key
for the subscription id. In your client interceptor, you could extract the Metadata
header that you want, and put it inside the Context
, using Context.current().withValue(key, metadata)
. Inside your StreamObserver
, you can extract this This by calling key.get(Context.current())
. This assumes you are using the Async API, rather than the blocking API.
The reason it is more difficult is because usually metadata is information about a call, but not directly related to the call itself. It is for things like tracing, encoding, stats, cancellation and things like that. If something changes the way you handle the request, it probably needs to go directly into the request itself, rather than being on the side.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With