i'm trying to implement pub sub pattern using grpc but i'm confusing a bit about how to do it properly.
my proto: rpc call (google.protobuf.Empty) returns (stream Data);
client:
asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
@Override
public void onNext(Data value) {
// process a data
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
} catch (StatusRuntimeException e) {
LOG.warn("RPC failed: {}", e.getStatus());
}
Thread.currentThread().join();
server service:
public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
private final BlockingQueue<Data> queue;
private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();
public Sender(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// waiting for first element
Data data = queue.take();
// send head element
observers.forEach(o -> o.onNext(data));
} catch (InterruptedException e) {
LOG.error("error: ", e);
Thread.currentThread().interrupt();
}
}
}
}
How to remove clients from global observers properly? How to received some sort of a signal when connection drops?
How to manage client-server reconnections? How to force client reconnect when connection drops?
Thanks in advance!
Observer pattern. The Publish-Subscribe pattern builds on the Observer pattern by decoupling subjects from observers via asynchronous messaging. Message Broker pattern. Many messaging subsystems that support a publish-subscribe model are implemented via a message broker.
Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic.
gRPC API. You can generate your own gRPC client libraries in any gRPC-supported language for the Pub/Sub API from its .
Reuse gRPC channels A gRPC channel should be reused when making gRPC calls. Reusing a channel allows calls to be multiplexed through an existing HTTP/2 connection. If a new channel is created for each gRPC call then the amount of time it takes to complete can increase significantly.
In the implementation of your service:
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
You need to get the Context of the current request, and listen for cancellation. For single-request, multi-response calls (a.k.a. Server streaming) the gRPC generated code is simplified to pass in the the request directly. This means that you con't have direct access to the underlying ServerCall.Listener
, which is how you would normally listen for clients disconnecting and cancelling.
Instead, every gRPC call has a Context
associated with it, which carries the cancellation and other request-scoped signals. For your case, you just need to listen for cancellation by adding your own listener, which then safely removes the response observer from your linked hash set.
As for reconnects: gRPC clients will automatically reconnect if the connection is broken, but usually will not retry the RPC unless it is safe to do so. In the case of server streaming RPCs, it is usually not safe to do, so you'll need to retry the RPC on your client directly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With