Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

grpc c++ async completion queue events

I am trying to understand the grpc c++ async model flow. This article (link ) already explains many of my doubts. Here is the code for grpc_asycn_server. To understand when CompletionQueue is getting requests, I added a few print statements as follows:

First inside the HandleRpcs() function.

void HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    int i = 0;
    while (true) {
      std::cout << i << std::endl; ///////////////////////////////
      // Block waiting to read the next event from the completion queue. The
      // event is uniquely identified by its tag, which in this case is the
      // memory address of a CallData instance.
      // The return value of Next should always be checked. This return value
      // tells us whether there is any kind of event or cq_ is shutting down.
      GPR_ASSERT(cq_->Next(&tag, &ok));
      GPR_ASSERT(ok);
      static_cast<CallData*>(tag)->Proceed();
      i++;
    }
  }

and inside the proceed() function:

void Proceed() {
  if (status_ == CREATE) {
    // Make this instance progress to the PROCESS state.
    status_ = PROCESS;

    // As part of the initial CREATE state, we *request* that the system
    // start processing SayHello requests. In this request, "this" acts are
    // the tag uniquely identifying the request (so that different CallData
    // instances can serve different requests concurrently), in this case
    // the memory address of this CallData instance.
    std::cout<<"RequestSayHello called"<<std::endl; ////////////////////////////
    service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                              this);
  } else if (status_ == PROCESS) {
    // Spawn a new CallData instance to serve new clients while we process
    // the one for this CallData. The instance will deallocate itself as
    // part of its FINISH state.
    new CallData(service_, cq_);

    // The actual processing.
    std::string prefix("Hello ");
    reply_.set_message(prefix + request_.name());

    // And we are done! Let the gRPC runtime know we've finished, using the
    // memory address of this instance as the uniquely identifying tag for
    // the event.
    status_ = FINISH;
    responder_.Finish(reply_, Status::OK, this);
  } else {
    std::cout<<"deallocated"<<std::endl; ////////////////////////////
    GPR_ASSERT(status_ == FINISH);
    // Once in the FINISH state, deallocate ourselves (CallData).
    delete this;
  }
}

Once I run the server and the one client ( client) then the server prints the following:

RequestSayHello called
i = 0
RequestSayHello called
i = 1
deallocated
i = 2

The second RequestSayHello called makes sense because of the creation of new CallData instance. My question is how come proceed()function executed the second time and deallocated gets printed?

like image 347
Debashish Avatar asked Oct 22 '19 07:10

Debashish


People also ask

Is gRPC completion queue thread safe?

Completion queues are thread-safe. CallData is an application-level construct. gRPC itself won't be accessing the internals of the struct. It just uses it as an opaque pointer.

Can gRPC be asynchronous?

Additionally, a gRPC RPC can be synchronous or asynchronous. Synchronous: a client call waits for the server to respond. Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.

What is a completion queue?

A Completion Queue is an object which contains the completed work requests which were posted to the Work Queues (WQ). Every completion says that a specific WR was completed (both successfully completed WRs and unsuccessfully completed WRs).

Is gRPC multithreaded?

gRPC Python does support multithreading on both client and server. As for server, you will create the server with a thread pool, so it is multithreading in default. As for client, you can create a channel and pass it to multiple Python thread and then create a stub for each thread.


1 Answers

The completion queue (cq_) structure handles several types of events, including both request and response events. The first call to proceed() enters the PROCESS stage of the state machine for the CallData object.

During this stage:
1. A new CallData object is created; this inserts a request event into cq_ as you mentioned
2. The responder_ is called with the reply object; this inserts a response event into cq_

Upon receiving the response event from cq_, proceed() is called again on the first CallData object, which is now in the FINISH state, so clean up is performed and deallocated is printed.

like image 192
Matthew Rasa Avatar answered Oct 16 '22 08:10

Matthew Rasa