Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do i handle streaming messages with Python gRPC

I'm following this Route_Guide sample.

The sample in question fires off and reads messages without replying to a specific message. The latter is what i'm trying to achieve.

Here's what i have so far:

import grpc
...

channel = grpc.insecure_channel(conn_str)
try:
    grpc.channel_ready_future(channel).result(timeout=5)
except grpc.FutureTimeoutError:
    sys.exit('Error connecting to server')
else:
    stub = MyService_pb2_grpc.MyServiceStub(channel)
    print('Connected to gRPC server.')
    this_is_just_read_maybe(stub)


def this_is_just_read_maybe(stub):
    responses = stub.MyEventStream(stream())
    for response in responses:
        print(f'Received message: {response}')
        if response.something:
            # okay, now what? how do i send a message here?

def stream():
    yield my_start_stream_msg
    # this is fine, i receive this server-side
    # but i can't check for incoming messages here

I don't seem to have a read() or write() on the stub, everything seems to be implemented with iterators.

How do i send a message from this_is_just_read_maybe(stub)? Is that even the right approach?

My Proto is a bidirectional stream:

service MyService {
  rpc MyEventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
}
like image 443
evilSnobu Avatar asked Dec 15 '17 11:12

evilSnobu


People also ask

Can gRPC be used for video streaming?

Using gRPC (Google remote procedural calls) for transferring video over the internet and receiving the results back. The video is sent from the server to the client. The client processes the received video, generates some results and passes these results back to the server. This code uses bi-directinal streaming.

What is stream in gRPC?

Server streaming RPCs where the client sends a request to the server and gets a stream to read a sequence of messages back. The client reads from the returned stream until there are no more messages. gRPC guarantees message ordering within an individual RPC call.

How does gRPC communicate?

Rather than using a textual format such as JSON or XML, gRPC uses a protocol buffer–based binary protocol to communicate with gRPC services and clients. Also, gRPC implements protocol buffers on top of HTTP/2, which makes it even faster for inter-process communication.


2 Answers

What you're trying to do is perfectly possible and will probably involve writing your own request iterator object that can be given responses as they arrive rather than using a simple generator as your request iterator. Perhaps something like

class MySmarterRequestIterator(object):

    def __init__(self):
        self._lock = threading.Lock()
        self._responses_so_far = []

    def __iter__(self):
        return self

    def _next(self):
        # some logic that depends upon what responses have been seen
        # before returning the next request message
        return <your message value>

    def __next__(self):  # Python 3
        return self._next()

    def next(self):  # Python 2
        return self._next()

    def add_response(self, response):
        with self._lock:
            self._responses.append(response)

that you then use like

my_smarter_request_iterator = MySmarterRequestIterator()
responses = stub.MyEventStream(my_smarter_request_iterator)
for response in responses:
    my_smarter_request_iterator.add_response(response)

. There will probably be locking and blocking in your _next implementation to handle the situation of gRPC Python asking your object for the next request that it wants to send and your responding (in effect) "wait, hold on, I don't know what request I want to send until after I've seen how the next response turned out".

like image 80
Nathaniel Manista At Google Avatar answered Sep 25 '22 21:09

Nathaniel Manista At Google


Instead of writing a custom iterator, you can also use a blocking queue to implement send and receive like behaviour for client stub:

import queue
...

send_queue = queue.SimpleQueue()  # or Queue if using Python before 3.7
my_event_stream = stub.MyEventStream(iter(send_queue.get, None))

# send
send_queue.push(StreamingMessage())

# receive
response = next(my_event_stream)  # type: StreamingMessage

This makes use of the sentinel form of iter, which converts a regular function into an iterator that stops when it reaches a sentinel value (in this case None).

like image 22
dcoles Avatar answered Sep 22 '22 21:09

dcoles