Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

grpc-java: Proper handling of retry on client for service streaming call

Tags:

grpc-java

I'm trying to setup a simple pubslish/subscribe pattern over grpc using service streaming together with async stub on client. After implementing part of streaming messages back to client, I wanted to handle scenarios for connection drops. Right now I'm implementing part when service is for example shut down and client should 'recover' from that loss of connection.

I've read and searched about retry mechanism on google/github/so and finally set up retry policy for method in service which streams messages. As far as I understood, retry mechanism should work when service returns some of retryableStatusCodes defined in retry policy. After introducing retry policy on client I wanted to test it, and the results from two following scenarios are what is confusing me about retry.

First scenario:

  • connect procedure is called (after ~n seconds intentionally no messages are streamed back to client)
  • service is shut down
  • onError is not called on client
  • service is up again
  • connect is reached again with retry

Second scenario:

  • connect procedure is called (after ~n seconds first message arrives, message is processed in onNext handler on client)
  • service is shut down
  • onError is called on client
  • service is up again
  • connect is not reached again with retry

Overall what confuses me is why is there a difference in behavior between the these two scenarios? Why is in the first scenario detected that server returned UNAVAILABLE and retry is attempted, but in second even with same status, retry doesn't work?

Here is the code for connect call on client, connect method on service, and setup of retry policy on client

client:

messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
    @Override
    public void onNext(MessageResponse messageResponse) {
        //process new message
        MessageDto message = new MessageDto();
        message.setBody(messageResponse.getBody());
        message.setTitle(messageResponse.getTitle());

        messageService.broadcastMessage(message);
    }

    @Override
    public void onError(Throwable throwable) {
        //service went down
        LOGGER.error(throwable.getStackTrace());
    }

    @Override
    public void onCompleted() {
        //This method should be called when user logs out of the application
        LOGGER.info(String.format("Message streaming terminated for user %d", userId));
    }
});
service:

@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
    Long userId = request.getUserId();

    ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
        (ServerCallStreamObserver<MessageResponse >) responseObserver;
    serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
    registerClient(userId, serverCallStreamObserver);
    //responseObserver.onCompleted() is left out so connection is not terminated
}


@EventListener
public void listenForMessages(MessageEvent messageEvent) {
    //omitted code (just some data retrieving - populate conn and message vars)....

    MessageResponse.Builder builder = MessageResponse.newBuilder();
    StreamObserver<MessageResponse> observer = conn.getResponseObserver();
    builder.setType(message.getType());
    builder.setTitle(message.getTitle());
    builder.setBody(message.getBody());

    observer.onNext(builder.build())
}

retryPolicy:

{
  "methodConfig" : [
    {
      "name": [
        {
          "service": "ch.example.proto.MessageService",
          "method": "connect"
        }
      ],
      "retryPolicy": {
        "maxAttempts": 10,
        "initialBackoff": "5s",
        "maxBackoff": "30s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": ["UNAVAILABLE"]
      }
    }
  ]
}
like image 214
Mr. Brightside Avatar asked Mar 19 '20 14:03

Mr. Brightside


People also ask

Is gRPC good for streaming data?

gRPC is a compelling technology for communication between a source and target over the network. It's fast, efficient, and because it runs on HTTP/2, gRPC supports both typical request/response interactions and long-running streaming communication.

How does exception handle in gRPC?

To that end, always use StreamObserver::OnError, which internally adds the status error to the trailing headers. The only exception, as we'll see below, is when we're working with streams. All client or server gRPC libraries support the official gRPC error model. Java encapsulates this error model with the class io.

How does gRPC streaming work?

Streaming. gRPC supports streaming semantics, where either the client or the server (or both) send a stream of messages on a single RPC call. The most general case is Bidirectional Streaming where a single gRPC call establishes a stream in which both the client and the server can send a stream of messages to each other ...

Are gRPC calls blocking?

gRPC supports non-blocking client calls. What this means is that when the client makes a call to the service, it does not need to wait for the server response. To handle the server response, the client can simply pass in the observer which dictates what to do when the response is received.


1 Answers

The problem is that receiving a message commits the RPC. This is discussed in gRFC A6 Client Retries. It mentions the Response-Headers, which are implicitly sent when the server responds with the first message.

Essentially, once gRPC has passed data back to the client there is no way to automatically retry. If gRPC retried how should it combine a new stream with what it has already responded with? Should it skip the first N responses? But what if the responses are different now? The problem is even worse for metadata (delivered via Response-Headers) as those cannot be provided to the client a second time.

gRPC is able to replay the client's requests to multiple backends, but once it starts receiving a response from a backend it will become "fixed" to that backend and be unable to change its decision.

You will need application-level retry to re-establish the stream. When the client reestablishes the stream it may need to modify the request to inform the server what messages the client has already received.

like image 93
Eric Anderson Avatar answered Jan 01 '23 00:01

Eric Anderson