I'm implementing a infinite stream response, like a pub/sub pattern using the gRPC architecture.
There is an endpoint that opens a response stream and keeps it until the client drops. To do so I'm storing a key value hash where the keys are gRPC contexts and the values are queues that I use to poll for sending messages.
My endpoint code looks like:
def StreamTrades(self, request, context):
self.feeds[context] = queue.Queue()
callback_queue = queue.Queue()
def remove_feed():
if self.feeds.get(context) is not None:
del self.feeds[context]
def stop_stream():
remove_feed()
def raise_stop_stream_exception():
raise StopStream('stopping stream')
callback_queue.put(raise_stop_stream_exception)
context.add_callback(stop_stream)
def output_generator():
while True:
try:
try:
callback = callback_queue.get(False)
callback()
except queue.Empty:
pass
if self.feeds.get(context) is not None:
trade = self.feeds[context].get()
if isinstance(trade, trades_pb2.Trade):
yield trade
else:
raise StopStream('stopping stream')
except IndexError:
pass
except StopStream:
return
return output_generator()
This code works fine for subscribing and publishing changes to clients. But there is a problem related to unsubscribing. What is the good way to detect a client drop? Using Context.add_callback(callBack) does not seem to work, since the callback is only called when the server finishes and closes the stream. And generators does not raise any kind of Status when the client is not there anymore. I saw that in Java, when a onNext is called in the streamObserver and there is no client a StatusRuntimeException with Status.CANCELLED is thrown, it allows a lazy unsubscription that would already be enough for me.
Is there any way to detect clients dropping the connection during a response stream?
The callback that you register with ServicerContext.add_callback
should be called when the client drops the connection; that it is not being called indicates that you're suffering from this bug. It is not the case that "the callback is only called when the server finishes and closes the stream".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With