Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous model in grpc c++

My team is designing a scalable solution with micro-services architecture and planning to use gRPC as the transport communication between layers. And we've decided to use async grpc model. The design that example(greeter_async_server.cc) provides doesn't seem viable if I scale the number of RPC methods, because then I'll have to create a new class for every RPC method, and create their objects in HandleRpcs() like this. Pastebin (Short example code).

   void HandleRpcs() {
            new CallDataForRPC1(&service_, cq_.get());
            new CallDataForRPC2(&service_, cq_.get());
            new CallDataForRPC3(&service, cq_.get());
            // so on...
    }

It'll be hard-coded, all the flexibility will be lost.

I've around 300-400RPC methods to implement and having 300-400 classes will be cumbersome and inefficient when I'll have to handle more than 100K RPC requests/sec and this solution is a very bad design. I can't bear the overhead of creation of objects this way on every single request. Can somebody kindly provide me a workaround for this. Can async grpc c++ not be simple like its sync companion?

Edit: In favour of making the situation more clear, and for those who might be struggling to grasp the flow of this async example, I'm writing what I've understood so far, please make me correct if wrong somewhere.

In async grpc, every time we have to bind a unique-tag with the completion-queue so that when we poll, the server can give it back to us when the particular RPC will be hit by the client, and we infer from the returned unique-tag about the type of the call.

service_->RequestRPC2(&ctx_, &request_, &responder_, cq_, cq_,this); Here we're using the address of the current object as the unique-tag. This is like registering for our RPC call on the completion queue. Then we poll down in HandleRPCs() to see if the client hits the RPC, if so then cq_->Next(&tag, &OK) will fill the tag. The polling code snippet:

while (true) {
          GPR_ASSERT(cq_->Next(&tag, &ok));
          GPR_ASSERT(ok);
          static_cast<CallData*>(tag)->Proceed();
        }

Since, the unique-tag that we registered into the queue was the address of the CallData object so we're able to call Proceed(). This was fine for one RPC with its logic inside Proceed(). But with more RPCs each time we'll have all of them inside the CallData, then on polling, we'll be calling the only one Proceed() which will contain logic to (say) RPC1(postgres calls), RPC2(mongodb calls), .. so on. This is like writing all my program inside one function. So, to avoid this, I used a GenericCallData class with the virtual void Proceed() and made derived classes out of it, one class per RPC with their own logic inside their own Proceed(). This is a working solution but I want to avoid writing many classes.

Another solution I tried was keeping all RPC-function-logics out of the proceed() and into their own functions and maintaining a global std::map<long, std::function</*some params*/>> . So whenever I register an RPC with unique-tag onto the queue, I store its corresponding logic function (which I'll surely hard code into the statement and bind all the parameters required), then the unique-tag as key. On polling, when I get the &tag I do a lookup in the map for this key and call the corresponding saved function. Now, there's one more hurdle, I'll have to do this inside the function logic:

// pseudo code
void function(reply, responder, context, service)
{
    // register this RPC with another unique tag so to serve new incoming request of the same type on the completion queue
     service_->RequestRPC1(/*params*/, new_unique_id);
    // now again save this new_unique_id and current function into the map, so when tag will be returned we can do lookup
     map.emplace(new_unique_id, function);

    // now you're free to do your logic
    // do your logic
}

You see this, code has spread into another module now, and it's per RPC based. Hope it clears the situation. I thought if somebody could have implemented this type of server in a more easy way.

like image 572
WhiteSword Avatar asked Mar 16 '18 10:03

WhiteSword


People also ask

Can gRPC be asynchronous?

Additionally, a gRPC RPC can be synchronous or asynchronous. Synchronous: a client call waits for the server to respond. Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.

Is gRPC server multi threaded?

Multiple gRPC clients can be created from a channel, including different types of clients. A channel and clients created from the channel can safely be used by multiple threads. Clients created from the channel can make multiple simultaneous calls.

Is gRPC completion queue thread safe?

Completion queues are thread-safe. CallData is an application-level construct. gRPC itself won't be accessing the internals of the struct. It just uses it as an opaque pointer.

How does gRPC communicate?

In gRPC, the client triggers communication, which consists of request headers, binary messages (this is known as a length-prefixed message), and end-of-stream flag to notify the server that the client finished sending the content.


2 Answers

This post is pretty old by now but I have not seen any answer or example regarding this so I will show how I solved it to any other readers. I have around 30 RPC calls and was looking for a way of reducing the footprint when adding and removing RPC calls. It took me some iterations to figure out a good way to solve it.

So my interface for getting RPC requests from my (g)RPC library is a callback interface that the recepiant need to implement. The interface looks like this:

class IRpcRequestHandler
{
public:
    virtual ~IRpcRequestHandler() = default;
    virtual void onZigbeeOpenNetworkRequest(const smarthome::ZigbeeOpenNetworkRequest& req,
                                            smarthome::Response& res) = 0;
    virtual void onZigbeeTouchlinkDeviceRequest(const smarthome::ZigbeeTouchlinkDeviceRequest& req,
                                                smarthome::Response& res) = 0;
    ...
};

And some code for setting up/register each RPC method after the gRPC server is started:

void ready() 
{
    SETUP_SMARTHOME_CALL("ZigbeeOpenNetwork", // Alias that is used for debug messages
                         smarthome::Command::AsyncService::RequestZigbeeOpenNetwork,  // Generated gRPC service method for async.
                         smarthome::ZigbeeOpenNetworkRequest, // Generated gRPC service request message
                         smarthome::Response, // Generated gRPC service response message
                         IRpcRequestHandler::onZigbeeOpenNetworkRequest); // The callback method to call when request has arrived.

    SETUP_SMARTHOME_CALL("ZigbeeTouchlinkDevice",
                         smarthome::Command::AsyncService::RequestZigbeeTouchlinkDevice,
                         smarthome::ZigbeeTouchlinkDeviceRequest,
                         smarthome::Response,
                         IRpcRequestHandler::onZigbeeTouchlinkDeviceRequest);
    ...
}

This is all that you need to care about when adding and removing RPC methods.

The SETUP_SMARTHOME_CALL is a home-cooked macro which looks like this:

#define SETUP_SMARTHOME_CALL(ALIAS, SERVICE, REQ, RES, CALLBACK_FUNC) \
  new ServerCallData<REQ, RES>(                                       \
      ALIAS,                                                          \
      std::bind(&SERVICE,                                             \
                &mCommandService,                                     \
                std::placeholders::_1,                                \
                std::placeholders::_2,                                \
                std::placeholders::_3,                                \
                std::placeholders::_4,                                \
                std::placeholders::_5,                                \
                std::placeholders::_6),                               \
      mCompletionQueue.get(),                                         \
      std::bind(&CALLBACK_FUNC, requestHandler, std::placeholders::_1, std::placeholders::_2))

I think the ServerCallData class looks like the one from gRPCs examples with a few modifications. ServerCallData is derived from a non-templete class with an abstract function void proceed(bool ok) for the CompletionQueue::Next() handling. When ServerCallData is created, it will call the SERVICE method to register itself on the CompletionQueue and on every first proceed(ok) call, it will clone itself which will register another instance. I can post some sample code for that as well if someone is interested.

EDIT: Added some more sample code below.

GrpcServer

class GrpcServer
{
 public:
  explicit GrpcServer(std::vector<grpc::Service*> services);
  virtual ~GrpcServer();

  void run(const std::string& sslKey,
           const std::string& sslCert,
           const std::string& password,
           const std::string& listenAddr,
           uint32_t port,
           uint32_t threads = 1);

 private:
  virtual void ready();  // Called after gRPC server is created and before polling CQ.
  void handleRpcs();  // Function that polls from CQ, can be run by multiple threads. Casts object to CallData and calls CallData::proceed().

  std::unique_ptr<ServerCompletionQueue> mCompletionQueue;
  std::unique_ptr<Server> mServer;
  std::vector<grpc::Service*> mServices;
  std::list<std::shared_ptr<std::thread>> mThreads;
  ...
}

And the main part of the CallData object:

template <typename TREQUEST, typename TREPLY>
class ServerCallData : public ServerCallMethod
{
 public:
  explicit ServerCallData(const std::string& methodName,
                          std::function<void(ServerContext*,
                                             TREQUEST*,
                                             ::grpc::ServerAsyncResponseWriter<TREPLY>*,
                                             ::grpc::CompletionQueue*,
                                             ::grpc::ServerCompletionQueue*,
                                             void*)> serviceFunc,
                          grpc::ServerCompletionQueue* completionQueue,
                          std::function<void(const TREQUEST&, TREPLY&)> callback,
                          bool first = false)
      : ServerCallMethod(methodName),
        mResponder(&mContext),
        serviceFunc(serviceFunc),
        completionQueue(completionQueue),
        callback(callback)
  {
    requestNewCall();
  }

  void proceed(bool ok) override
  {
    if (!ok)
    {
      delete this;
      return;
    }

    if (callStatus() == ServerCallMethod::PROCESS)
    {    
      callStatus() = ServerCallMethod::FINISH;
      new ServerCallData<TREQUEST, TREPLY>(callMethodName(), serviceFunc, completionQueue, callback);

      try
      {
        callback(mRequest, mReply);
      }
      catch (const std::exception& e)
      {
        mResponder.Finish(mReply, Status::CANCELLED, this);
        return;
      }

      mResponder.Finish(mReply, Status::OK, this);
    }
    else
    {    
      delete this;
    }
  }

 private:
  void requestNewCall()
  {
    serviceFunc(
        &mContext, &mRequest, &mResponder, completionQueue, completionQueue, this);
  }

  ServerContext mContext;
  TREQUEST mRequest;
  TREPLY mReply;
  ServerAsyncResponseWriter<TREPLY> mResponder;
  std::function<void(ServerContext*,
                     TREQUEST*,
                     ::grpc::ServerAsyncResponseWriter<TREPLY>*,
                     ::grpc::CompletionQueue*,
                     ::grpc::ServerCompletionQueue*,
                     void*)>
      serviceFunc;
  std::function<void(const TREQUEST&, TREPLY&)> callback;
  grpc::ServerCompletionQueue* completionQueue;
};
like image 198
aleer Avatar answered Oct 04 '22 17:10

aleer


Although the thread is old I wanted to share a solution I am currently implementing. It mainly consists templated classes inheriting CallData to be scalable. This way, each new rpc will only require specializing the templates of the required CallData methods.

Calldata header:

class CallData {
    protected:
        enum Status { CREATE, PROCESS, FINISH };
        Status status;

        virtual void treat_create() = 0;
        virtual void treat_process() = 0;

    public:
        void Proceed();
};

CallData Proceed implementation:

void CallData::Proceed() {
    switch (status) {
        case CREATE:
            status = PROCESS;
            treat_create();
            break;
        case PROCESS:
            status = FINISH;
            treat_process();
            break;
        case FINISH:
            delete this;
    }
}

Inheriting from CallData header (simplified):

template <typename Request, typename Reply>
class CallDataTemplated : CallData {
    static_assert(std::is_base_of<google::protobuf::Message, Request>::value, 
        "Request and reply must be protobuf messages");
    static_assert(std::is_base_of<google::protobuf::Message, Reply>::value,
        "Request and reply must be protobuf messages");

    private:
        Service,Cq,Context,ResponseWriter,...
        Request request;
        Reply reply;

    protected:
        void treat_create() override;
        void treat_process() override;

    public:
        ...
};

Then, for specific rpc's in theory you should be able to do things like:

template<>
void CallDataTemplated<HelloRequest, HelloReply>::treat_process() {
     ...
}

It's a lot of templated methods but preferable to creating a class per rpc from my point of view.

like image 36
Stefan van Heijningen Avatar answered Oct 03 '22 17:10

Stefan van Heijningen