I'm trying to setup a simple pubslish/subscribe pattern over grpc using service streaming together with async stub on client. After implementing part of streaming messages back to client, I wanted to handle scenarios for connection drops. Right now I'm implementing part when service is for example shut down and client should 'recover' from that loss of connection.
I've read and searched about retry mechanism on google/github/so and finally set up retry policy for method in service which streams messages. As far as I understood, retry mechanism should work when service returns some of retryableStatusCodes defined in retry policy. After introducing retry policy on client I wanted to test it, and the results from two following scenarios are what is confusing me about retry.
First scenario:
Second scenario:
Overall what confuses me is why is there a difference in behavior between the these two scenarios? Why is in the first scenario detected that server returned UNAVAILABLE and retry is attempted, but in second even with same status, retry doesn't work?
Here is the code for connect call on client, connect method on service, and setup of retry policy on client
client:
messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
@Override
public void onNext(MessageResponse messageResponse) {
//process new message
MessageDto message = new MessageDto();
message.setBody(messageResponse.getBody());
message.setTitle(messageResponse.getTitle());
messageService.broadcastMessage(message);
}
@Override
public void onError(Throwable throwable) {
//service went down
LOGGER.error(throwable.getStackTrace());
}
@Override
public void onCompleted() {
//This method should be called when user logs out of the application
LOGGER.info(String.format("Message streaming terminated for user %d", userId));
}
});
service:
@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
Long userId = request.getUserId();
ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
(ServerCallStreamObserver<MessageResponse >) responseObserver;
serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
registerClient(userId, serverCallStreamObserver);
//responseObserver.onCompleted() is left out so connection is not terminated
}
@EventListener
public void listenForMessages(MessageEvent messageEvent) {
//omitted code (just some data retrieving - populate conn and message vars)....
MessageResponse.Builder builder = MessageResponse.newBuilder();
StreamObserver<MessageResponse> observer = conn.getResponseObserver();
builder.setType(message.getType());
builder.setTitle(message.getTitle());
builder.setBody(message.getBody());
observer.onNext(builder.build())
}
retryPolicy:
{
"methodConfig" : [
{
"name": [
{
"service": "ch.example.proto.MessageService",
"method": "connect"
}
],
"retryPolicy": {
"maxAttempts": 10,
"initialBackoff": "5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}
]
}
gRPC is a compelling technology for communication between a source and target over the network. It's fast, efficient, and because it runs on HTTP/2, gRPC supports both typical request/response interactions and long-running streaming communication.
To that end, always use StreamObserver::OnError, which internally adds the status error to the trailing headers. The only exception, as we'll see below, is when we're working with streams. All client or server gRPC libraries support the official gRPC error model. Java encapsulates this error model with the class io.
Streaming. gRPC supports streaming semantics, where either the client or the server (or both) send a stream of messages on a single RPC call. The most general case is Bidirectional Streaming where a single gRPC call establishes a stream in which both the client and the server can send a stream of messages to each other ...
gRPC supports non-blocking client calls. What this means is that when the client makes a call to the service, it does not need to wait for the server response. To handle the server response, the client can simply pass in the observer which dictates what to do when the response is received.
The problem is that receiving a message commits the RPC. This is discussed in gRFC A6 Client Retries. It mentions the Response-Headers
, which are implicitly sent when the server responds with the first message.
Essentially, once gRPC has passed data back to the client there is no way to automatically retry. If gRPC retried how should it combine a new stream with what it has already responded with? Should it skip the first N
responses? But what if the responses are different now? The problem is even worse for metadata (delivered via Response-Headers
) as those cannot be provided to the client a second time.
gRPC is able to replay the client's requests to multiple backends, but once it starts receiving a response from a backend it will become "fixed" to that backend and be unable to change its decision.
You will need application-level retry to re-establish the stream. When the client reestablishes the stream it may need to modify the request to inform the server what messages the client has already received.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With