Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to design publish-subscribe pattern properly in grpc?

i'm trying to implement pub sub pattern using grpc but i'm confusing a bit about how to do it properly.

my proto: rpc call (google.protobuf.Empty) returns (stream Data);

client:

asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
         @Override
         public void onNext(Data value) {
           // process a data

         @Override
         public void onError(Throwable t) {

         }

         @Override
         public void onCompleted() {

         }
       });

   } catch (StatusRuntimeException e) {
     LOG.warn("RPC failed: {}", e.getStatus());
   }

   Thread.currentThread().join();

server service:

public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
  private final BlockingQueue<Data> queue;
  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();

  public Sender(BlockingQueue<Data> queue) {
    this.queue = queue;
  }

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

  @Override
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        // waiting for first element
        Data data = queue.take();
        // send head element
        observers.forEach(o -> o.onNext(data));

      } catch (InterruptedException e) {
        LOG.error("error: ", e);
        Thread.currentThread().interrupt();
      }
    }
  }
}

How to remove clients from global observers properly? How to received some sort of a signal when connection drops?
How to manage client-server reconnections? How to force client reconnect when connection drops?

Thanks in advance!

like image 907
Dmitry Zagorulkin Avatar asked Mar 01 '19 10:03

Dmitry Zagorulkin


People also ask

Which design pattern is publish subscribe pattern?

Observer pattern. The Publish-Subscribe pattern builds on the Observer pattern by decoupling subjects from observers via asynchronous messaging. Message Broker pattern. Many messaging subsystems that support a publish-subscribe model are implemented via a message broker.

How does the publish subscribe pattern work?

Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic.

Does gRPC support pub sub?

gRPC API. You can generate your own gRPC client libraries in any gRPC-supported language for the Pub/Sub API from its .

How can I improve my gRPC performance?

Reuse gRPC channels A gRPC channel should be reused when making gRPC calls. Reusing a channel allows calls to be multiplexed through an existing HTTP/2 connection. If a new channel is created for each gRPC call then the amount of time it takes to complete can increase significantly.


1 Answers

In the implementation of your service:

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

You need to get the Context of the current request, and listen for cancellation. For single-request, multi-response calls (a.k.a. Server streaming) the gRPC generated code is simplified to pass in the the request directly. This means that you con't have direct access to the underlying ServerCall.Listener, which is how you would normally listen for clients disconnecting and cancelling.

Instead, every gRPC call has a Context associated with it, which carries the cancellation and other request-scoped signals. For your case, you just need to listen for cancellation by adding your own listener, which then safely removes the response observer from your linked hash set.


As for reconnects: gRPC clients will automatically reconnect if the connection is broken, but usually will not retry the RPC unless it is safe to do so. In the case of server streaming RPCs, it is usually not safe to do, so you'll need to retry the RPC on your client directly.

like image 117
Carl Mastrangelo Avatar answered Oct 22 '22 01:10

Carl Mastrangelo