Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to broadcast in gRPC from server to client?

I'm creating a small chat application in gRPC right now and I've run into the issue where if a user wants to connect to the gRPC server as a client, I'd like to broadcast that the event has occurred to all other connected clients.

I'm thinking of using some sort of observer but I"m confused as to how the server knows of who is connected and how I would broadcast the event to all clients and not just one or two.

I know using streams is part of the answer, but because each client is creating it's own stream with the server, I'm unsure of how it can subscribe to other server-client streams.

like image 984
Somnium Avatar asked Mar 30 '18 20:03

Somnium


1 Answers

Another option would be to use a long-polling approach. That is try something like below (code in Python, since that is what I'm most familiar with, but go should be very similar). This was not tested, and is meant to just give you an idea of how to do long-polling in gRPC:

.PROTO defs
-------------------------------------------------
service Updater {
    rpc GetUpdates(GetUpdatesRequest) returns (GetUpdatesResponse);
}

message GetUpdatesRequest {
    int64 last_received_update = 1;
}

message GetUpdatesResponse {
    repeated Update updates = 1;
    int64 update_index = 2;
}

message Update {
    // your update structure
}


SERVER
-----------------------------------------------------------
class UpdaterServer(UpdaterServicer):
    def __init__(self):
        self.condition = threading.Condition()
        self.updates = []

    def post_update(self, update):
        """
        Used whenever the clients should be updated about something. It will
        trigger their long-poll calls to return
        """
        with self.condition:
            # TODO: You should probably remove old updates after some time
            self.updates.append(updates)
            self.condition.notify_all()

    def GetUpdates(self, req, context):
        with self.condition:
            while self.updates[req.last_received_update + 1:] == []:
                self.condition.wait()
            new_updates = self.updates[req.last_received_update + 1:]
            response = GetUpdatesResponse()
            for update in new_updates:
                response.updates.add().CopyFrom(update)
            response.update_index = req.last_received_update + len(new_updates)
            return response


SEPARATE THREAD IN THE CLIENT
----------------------------------------------
request = GetUpdatesRequest()
request.last_received_update = -1
while True:
    stub = UpdaterStub(channel)
    try:
        response = stub.GetUpdates(request, timeout=60*10)
        handle_updates(response.updates)
        request.last_received_update = response.update_index
    except grpc.FutureTimeoutError:
        pass
like image 137
Michał Avatar answered Sep 28 '22 06:09

Michał