Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Closing all open streams in GRPC-Java from client end cleanly

I am using GRPC-Java 1.1.2. In an active GRPC session, I have a few bidirectional streams open. Is there a way to clean them from the client end when the client is disconnecting? When I try to disconnect, I run the following look for a fixed number of times and then disconnect but I can see the following error on the server side (not sure if its caused by another issue though):

disconnect from client

while (!channel.awaitTermination(3, TimeUnit.SECONDS)) {
// check for upper bound and break if so
}
channel.shutdown().awaitTermination(3, TimeUnit.SECONDS);

error on server

E0414 11:26:48.787276000 140735121084416 ssl_transport_security.c:439] SSL_read returned 0 unexpectedly.
E0414 11:26:48.787345000 140735121084416 secure_endpoint.c:185]        Decryption error: TSI_INTERNAL_ERROR
like image 608
ali haider Avatar asked Apr 14 '17 15:04

ali haider


2 Answers

If you want to close gRPC (server-side or bi-di) streams from the client end, you will have to attach the rpc call with a Context.CancellableContext found in package io.grpc.

Suppose you have an rpc:

service Messaging {
    rpc Listen (ListenRequest) returns (stream Message) {}
}

In the client side, you will handle it like this:

public class Messaging {
    private Context.CancellableContext mListenContext;

    private MessagingGrpc.MessagingStub getMessagingAsyncStub() {
    /* return your async stub */
    }

    public void listen(final ListenRequest listenRequest, final StreamObserver<Message> messageStream) {

        Runnable listenRunnable = new Runnable() {
            @Override
            public void run() {
                Messaging.this.getMessagingAsyncStub().listen(listenRequest, messageStream);
            }

        if (mListenContext != null && !mListenContext.isCancelled()) {
            Log.d(TAG, "listen: already listening");
            return;
        }

        mListenContext = Context.current().withCancellation();
        mListenContext.run(listenRunnable);
    }

    public void cancelListen() {
        if (mListenContext != null) {
            mListenContext.cancel(null);
            mListenContext = null;
        }
    }
}

Calling cancelListen() will emulate the error, 'CANCELLED', the connection will be closed, and onError of your StreamObserver<Message> messageStream will be invoked with throwable message: 'CANCELLED'.

like image 128
Abdul Wasae Avatar answered Sep 23 '22 04:09

Abdul Wasae


If you use shutdownNow() it will more aggressively shutdown the RPC streams you have. Also, you need to call shutdown() or shutdownNow() before calling awaitTermination().

That said, a better solution would be to end all your RPCs gracefully before closing the channel.

like image 43
Carl Mastrangelo Avatar answered Sep 26 '22 04:09

Carl Mastrangelo