I am trying to understand how to handle a grpc api with bidirectional streaming (using the Python API).
Say I have the following simple server definition:
syntax = "proto3";
package simple;
service TestService {
rpc Translate(stream Msg) returns (stream Msg){}
}
message Msg
{
string msg = 1;
}
Say that the messages that will be sent from the client come asynchronously ( as a consequence of user selecting some ui elements).
The generated python stub for the client will contain a method Translate
that will accept a generator function and will return an iterator.
What is not clear to me is how would I write the generator function that will return messages as they are created by the user. Sleeping on the thread while waiting for messages doesn't sound like the best solution.
It can be done asynchronously if your call to res. get can be done asynchronously (if it is defined with the async keyword). While grpc. server says it requires a futures.
The gRPC supports both unary RPC and streaming RPC. In streaming RPC, a client sends a single request and receives a bunch of messages as the response. A streaming RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond.
gRPC Python wraps gRPC core, which uses multithreading for performance, and hence doesn't support fork() .
Multiple gRPC clients can be created from a channel, including different types of clients. A channel and clients created from the channel can safely be used by multiple threads. Clients created from the channel can make multiple simultaneous calls.
This is a bit clunky right now, but you can accomplish your use case as follows:
#!/usr/bin/env python
from __future__ import print_function
import time
import random
import collections
import threading
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
import grpc
from translate_pb2 import Msg
from translate_pb2_grpc import TestServiceStub
from translate_pb2_grpc import TestServiceServicer
from translate_pb2_grpc import add_TestServiceServicer_to_server
def translate_next(msg):
return ''.join(reversed(msg))
class Translator(TestServiceServicer):
def Translate(self, request_iterator, context):
for req in request_iterator:
print("Translating message: {}".format(req.msg))
yield Msg(msg=translate_next(req.msg))
class TranslatorClient(object):
def __init__(self):
self._stop_event = threading.Event()
self._request_condition = threading.Condition()
self._response_condition = threading.Condition()
self._requests = collections.deque()
self._last_request = None
self._expected_responses = collections.deque()
self._responses = {}
def _next(self):
with self._request_condition:
while not self._requests and not self._stop_event.is_set():
self._request_condition.wait()
if len(self._requests) > 0:
return self._requests.popleft()
else:
raise StopIteration()
def next(self):
return self._next()
def __next__(self):
return self._next()
def add_response(self, response):
with self._response_condition:
request = self._expected_responses.popleft()
self._responses[request] = response
self._response_condition.notify_all()
def add_request(self, request):
with self._request_condition:
self._requests.append(request)
with self._response_condition:
self._expected_responses.append(request.msg)
self._request_condition.notify()
def close(self):
self._stop_event.set()
with self._request_condition:
self._request_condition.notify()
def translate(self, to_translate):
self.add_request(to_translate)
with self._response_condition:
while True:
self._response_condition.wait()
if to_translate.msg in self._responses:
return self._responses[to_translate.msg]
def _run_client(address, translator_client):
with grpc.insecure_channel('localhost:50054') as channel:
stub = TestServiceStub(channel)
responses = stub.Translate(translator_client)
for resp in responses:
translator_client.add_response(resp)
def main():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_TestServiceServicer_to_server(Translator(), server)
server.add_insecure_port('[::]:50054')
server.start()
translator_client = TranslatorClient()
client_thread = threading.Thread(
target=_run_client, args=('localhost:50054', translator_client))
client_thread.start()
def _translate(to_translate):
return translator_client.translate(Msg(msg=to_translate)).msg
translator_pool = futures.ThreadPoolExecutor(max_workers=4)
to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)
translations = translator_pool.map(_translate, to_translate)
print("Translations: {}".format(zip(to_translate, translations)))
translator_client.close()
client_thread.join()
server.stop(None)
if __name__ == "__main__":
main()
The basic idea is to have an object called TranslatorClient
running on a separate thread, correlating requests and responses. It expects that responses will return in the order that requests were sent out. It also implements the iterator interface so that you can pass it directly to an invocation of the Translate
method on your stub.
We spin up a thread running _run_client
which pulls responses out of TranslatorClient
and feeds them back in the other end with add_response
.
The main
function I included here is really just a strawman since I don't have the particulars of your UI code. I'm running _translate
in a ThreadPoolExecutor
to demonstrate that, even though translator_client.translate
is synchronous, it yields, allowing you to have multiple in-flight requests at once.
We recognize that this is a lot of code to write for such a simple use case. Ultimately, the answer will be asyncio
support. We have plans for this in the not-too-distant future. But for the moment, this sort of solution should keep you going whether you're running python 2 or python 3.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With