Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python - detect client drop during gRPC infinite stream

I'm implementing a infinite stream response, like a pub/sub pattern using the gRPC architecture.

There is an endpoint that opens a response stream and keeps it until the client drops. To do so I'm storing a key value hash where the keys are gRPC contexts and the values are queues that I use to poll for sending messages.

My endpoint code looks like:

def StreamTrades(self, request, context):
    self.feeds[context] = queue.Queue()

    callback_queue = queue.Queue()

    def remove_feed():
        if self.feeds.get(context) is not None:
            del self.feeds[context]

    def stop_stream():
        remove_feed()

        def raise_stop_stream_exception():
            raise StopStream('stopping stream')

        callback_queue.put(raise_stop_stream_exception)

    context.add_callback(stop_stream)

    def output_generator():
        while True:
            try:
                try:
                    callback = callback_queue.get(False)
                    callback()
                except queue.Empty:
                    pass
                if self.feeds.get(context) is not None:
                    trade = self.feeds[context].get()
                    if isinstance(trade, trades_pb2.Trade):
                        yield trade
                else:
                    raise StopStream('stopping stream')
            except IndexError:
                pass
            except StopStream:
                return

    return output_generator()

This code works fine for subscribing and publishing changes to clients. But there is a problem related to unsubscribing. What is the good way to detect a client drop? Using Context.add_callback(callBack) does not seem to work, since the callback is only called when the server finishes and closes the stream. And generators does not raise any kind of Status when the client is not there anymore. I saw that in Java, when a onNext is called in the streamObserver and there is no client a StatusRuntimeException with Status.CANCELLED is thrown, it allows a lazy unsubscription that would already be enough for me.

Is there any way to detect clients dropping the connection during a response stream?

like image 343
Felipe Jun Avatar asked Mar 09 '23 23:03

Felipe Jun


1 Answers

The callback that you register with ServicerContext.add_callback should be called when the client drops the connection; that it is not being called indicates that you're suffering from this bug. It is not the case that "the callback is only called when the server finishes and closes the stream".

like image 192
Nathaniel Manista At Google Avatar answered Mar 24 '23 20:03

Nathaniel Manista At Google