Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can a gRPC server notice that the client has cancelled a server-side streaming call?

Tags:

grpc

grpc-java

I want to use gRPC to let clients subscribe to events generated by the server. I have an RPC declared like so:

rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);

where the returned stream is infinite. To "unsubscribe", clients cancel the RPC (btw. is there a cleaner way?).

I have figured out how the client can cancel the call:

Context.CancellableContext cancellableContext =
         Context.current().withCancellation();
cancellableContext.run(() -> {
   stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());

However, the server does not seem to notice that a client has cancelled its call. I'm testing this with a dummy server implementation:

@Override
public void subscribe(SubscribeRequest request,
                      StreamObserver<SubscribeResponse> responseObserver) {
  // in real code, this will happen in a separate thread.
  while (!Thread.interrupted()) {
    responseObserver.onNext(SubscribeResponse.getDefaultInstance());
  }
}

The server will happily continue sending its messages into the ether. How can the server recognize that the call was cancelled by the client and thus stop sending responses?

like image 222
Balz Guenat Avatar asked Feb 08 '19 08:02

Balz Guenat


People also ask

How do I cancel my gRPC call?

A gRPC call can be canceled in the client by passing a cancellation token with CallOptions. CancellationToken or calling Dispose on the call. gRPC services that can be cancelled should: Pass ServerCallContext.

How does gRPC streaming work?

Streaming. gRPC supports streaming semantics, where either the client or the server (or both) send a stream of messages on a single RPC call. The most general case is Bidirectional Streaming where a single gRPC call establishes a stream in which both the client and the server can send a stream of messages to each other ...

How do gRPC calls work?

The gRPC client makes the local procedure call to the stub with parameters to be sent to the server. The client stub then serializes the parameters with the marshaling process using Protobuf and forwards the request to the local client-time library in the local machine.

Are gRPC calls blocking?

gRPC supports blocking client call. What this means is that once the client makes the call to the service, the client would not proceed with rest of the code execution until it gets the response back from the server. Note that a blocking client call is possible for unary calls and server streaming calls.


1 Answers

I found the answer myself. You cast the StreamObserver passed to subscribe to a ServerCallStreamObserver, which exposes methods isCancelled and setOnCancelHandler.

scso = ((ServerCallStreamObserver<SubscribeResponse>) responseObserver);

scso.setOnCancelHandler(handler);
// or
if (scso.isCancelled()) {
  // do whatever
}

This, to me, begs the question why subscribe isn't passed a ServerCallStreamObserver to begin with.

like image 91
Balz Guenat Avatar answered Oct 30 '22 10:10

Balz Guenat