Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

GRPC streaming select (python)

Tags:

python

grpc

Let's say I want to create a chat-like application. A client can send text to the server and vice versa. The order of text exchanges can be arbitrary.

The server depends on another stream which controls the server response stream. The GRPC stream is exposed as a python generator. How can the server now wait for client input and input on the other stream at the same time? Normally one would use something like select(), but here we have generators.

I have some example code which implements the wanted behavior but requires an additional thread on the client and server side. How can I achieve the same result without a thread?

Proto:

syntax = 'proto3';

service Scenario {
    rpc Chat(stream DPong) returns (stream DPong) {}
}

message DPong {
    string name = 1;
}

Server:

import random
import string
import threading

import grpc

import scenario_pb2_grpc
import scenario_pb2
import time
from concurrent import futures

class Scenario(scenario_pb2_grpc.ScenarioServicer):

    def Chat(self, request_iterator, context):
        def stream():
            while 1:
                time.sleep(1)
                yield random.choice(string.ascii_letters)

        output_stream = stream()

        def read_incoming():
            while 1:
                received = next(request_iterator)
                print('received: {}'.format(received))

        thread = threading.Thread(target=read_incoming)
        thread.daemon = True
        thread.start()

        while 1:
            yield scenario_pb2.DPong(name=next(output_stream))


if __name__ == '__main__':
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    scenario_pb2.add_ScenarioServicer_to_server(
        Scenario(), server)

    server.add_insecure_port('[::]:50052')
    server.start()
    print('listening ...')
    while 1:
        time.sleep(1)

Client

import threading

import grpc
import time

import scenario_pb2_grpc, scenario_pb2


def run():
    channel = grpc.insecure_channel('localhost:50052')
    stub = scenario_pb2_grpc.ScenarioStub(channel)
    print('client connected')

    def stream():
        while 1:
            yield scenario_pb2.DPong(name=input('$ '))

    input_stream = stub.Chat(stream())

    def read_incoming():
        while 1:
            print('received: {}'.format(next(input_stream).name))

    thread = threading.Thread(target=read_incoming)
    thread.daemon = True
    thread.start()

    while 1:
        time.sleep(1)

if __name__ == '__main__':
    print('client starting ...')
    run()
like image 898
user2221323 Avatar asked May 19 '17 18:05

user2221323


1 Answers

It is not currently possible to do this without spending the threads that you're spending. We're thinking about implementing enhancements that would allow implementations to avoid taking another thread, but those would be months away at earliest.

like image 81
Nathaniel Manista At Google Avatar answered Oct 17 '22 13:10

Nathaniel Manista At Google