Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

handling async streaming request in grpc python

I am trying to understand how to handle a grpc api with bidirectional streaming (using the Python API).

Say I have the following simple server definition:

syntax = "proto3";
package simple;

service TestService {
  rpc Translate(stream Msg) returns (stream Msg){}
}

message Msg
{
 string msg = 1;
}

Say that the messages that will be sent from the client come asynchronously ( as a consequence of user selecting some ui elements).

The generated python stub for the client will contain a method Translate that will accept a generator function and will return an iterator.

What is not clear to me is how would I write the generator function that will return messages as they are created by the user. Sleeping on the thread while waiting for messages doesn't sound like the best solution.

like image 303
raduw Avatar asked Mar 06 '19 17:03

raduw


People also ask

Is gRPC asynchronous Python?

It can be done asynchronously if your call to res. get can be done asynchronously (if it is defined with the async keyword). While grpc. server says it requires a futures.

Does gRPC support streaming?

The gRPC supports both unary RPC and streaming RPC. In streaming RPC, a client sends a single request and receives a bunch of messages as the response. A streaming RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond.

Is Python gRPC multithreaded?

gRPC Python wraps gRPC core, which uses multithreading for performance, and hence doesn't support fork() .

Can gRPC handle multiple clients?

Multiple gRPC clients can be created from a channel, including different types of clients. A channel and clients created from the channel can safely be used by multiple threads. Clients created from the channel can make multiple simultaneous calls.


1 Answers

This is a bit clunky right now, but you can accomplish your use case as follows:

#!/usr/bin/env python

from __future__ import print_function

import time
import random
import collections
import threading

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
import grpc

from translate_pb2 import Msg
from translate_pb2_grpc import TestServiceStub
from translate_pb2_grpc import TestServiceServicer
from translate_pb2_grpc import add_TestServiceServicer_to_server


def translate_next(msg):
    return ''.join(reversed(msg))


class Translator(TestServiceServicer):
  def Translate(self, request_iterator, context):
    for req in request_iterator:
      print("Translating message: {}".format(req.msg))
      yield Msg(msg=translate_next(req.msg))

class TranslatorClient(object):
  def __init__(self):
    self._stop_event = threading.Event()
    self._request_condition = threading.Condition()
    self._response_condition = threading.Condition()
    self._requests = collections.deque()
    self._last_request = None
    self._expected_responses = collections.deque()
    self._responses = {}

  def _next(self):
    with self._request_condition:
      while not self._requests and not self._stop_event.is_set():
        self._request_condition.wait()
      if len(self._requests) > 0:
        return self._requests.popleft()
      else:
        raise StopIteration()

  def next(self):
    return self._next()

  def __next__(self):
    return self._next()

  def add_response(self, response):
    with self._response_condition:
      request = self._expected_responses.popleft()
      self._responses[request] = response
      self._response_condition.notify_all()

  def add_request(self, request):
    with self._request_condition:
      self._requests.append(request)
      with self._response_condition:
        self._expected_responses.append(request.msg)
      self._request_condition.notify()

  def close(self):
    self._stop_event.set()
    with self._request_condition:
      self._request_condition.notify()

  def translate(self, to_translate):
    self.add_request(to_translate)
    with self._response_condition:
      while True:
        self._response_condition.wait()
        if to_translate.msg in self._responses:
          return self._responses[to_translate.msg]


def _run_client(address, translator_client):
  with grpc.insecure_channel('localhost:50054') as channel:
    stub = TestServiceStub(channel)
    responses = stub.Translate(translator_client)
    for resp in responses:
      translator_client.add_response(resp)

def main():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_TestServiceServicer_to_server(Translator(), server)
  server.add_insecure_port('[::]:50054')
  server.start()
  translator_client = TranslatorClient()
  client_thread = threading.Thread(
      target=_run_client, args=('localhost:50054', translator_client))
  client_thread.start()

  def _translate(to_translate):
    return translator_client.translate(Msg(msg=to_translate)).msg

  translator_pool = futures.ThreadPoolExecutor(max_workers=4)
  to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)
  translations = translator_pool.map(_translate, to_translate)
  print("Translations: {}".format(zip(to_translate, translations)))

  translator_client.close()
  client_thread.join()
  server.stop(None)


if __name__ == "__main__":
  main()

The basic idea is to have an object called TranslatorClient running on a separate thread, correlating requests and responses. It expects that responses will return in the order that requests were sent out. It also implements the iterator interface so that you can pass it directly to an invocation of the Translate method on your stub.

We spin up a thread running _run_client which pulls responses out of TranslatorClient and feeds them back in the other end with add_response.

The main function I included here is really just a strawman since I don't have the particulars of your UI code. I'm running _translate in a ThreadPoolExecutor to demonstrate that, even though translator_client.translate is synchronous, it yields, allowing you to have multiple in-flight requests at once.

We recognize that this is a lot of code to write for such a simple use case. Ultimately, the answer will be asyncio support. We have plans for this in the not-too-distant future. But for the moment, this sort of solution should keep you going whether you're running python 2 or python 3.

like image 128
Richard Belleville Avatar answered Oct 07 '22 00:10

Richard Belleville