I need to implement a distributed database and I implemented based on gRPC example. The following is my code snippet.
sundial_sync_server.h
#ifndef SUNDIAL_GRPC_SYNC_SERVER_H
#define SUNDIAL_GRPC_SYNC_SERVER_H
#endif //SUNDIAL_GRPC_SYNC_SERVER_H
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
using grpc::Channel;
using grpc::ServerContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
#ifndef ABC
#define ABC
class SundialServiceImp final : public Sundial_GRPC_SYNC::Service
{
public:
Status contactRemote(::grpc::ServerContext* context, const ::sundial_rpc::SundialRequest* request, ::sundial_rpc::SundialResponse* response) override;
void run();
};
#endif
sundial_sync_server.cpp
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "txn.h"
#include "global.h"
#include "manager.h"
#include "stats.h"
#include "helper.h"
#include "grpc_sync_server.h"
#include "txn_table.h"
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
using grpc::ServerContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
using grpc::Server;
using grpc::ServerBuilder;
Status SundialServiceImp::contactRemote(::grpc::ServerContext* context, const ::sundial_rpc::SundialRequest* request, ::sundial_rpc::SundialResponse* response){
if (request->request_type() == SundialRequest::SYS_REQ) {
glob_manager->receive_sync_request();
response->set_response_type( SundialResponse::SYS_RESP );
return Status::OK;
}
uint64_t txn_id = request->txn_id();
TxnManager * txn_man = txn_table->get_txn(txn_id);
// If no TxnManager exists for the requesting transaction, create one.
if (txn_man == NULL) {
//printf("adding txnID=%ld into txn_table\n", txn_id);
assert( request->request_type() == SundialRequest::READ_REQ );
txn_man = new TxnManager();
txn_man->set_txn_id( txn_id );
txn_table->add_txn( txn_man );
}
// the transaction handles the RPC call
txn_man->process_remote_request(request, response);
// if the sub-transaction is no longer required, remove from txn_table
if (response->response_type() == SundialResponse::RESP_ABORT
|| response->response_type() == SundialResponse::PREPARED_OK_RO
|| response->response_type() == SundialResponse::PREPARED_ABORT
|| response->response_type() == SundialResponse::ACK) {
txn_table->remove_txn( txn_man );
delete txn_man;
}
return Status::OK;
}
void SundialServiceImp::run(){
std::istringstream in(ifconfig_string);
string line;
uint32_t num_nodes = 0;
string port;
while (getline (in, line)) {
if (line[0] == '#')
continue;
else {
if (num_nodes == g_node_id) {
port = line.substr(0, line.length());
break;
}
num_nodes ++;
}
}
port.append(sync_port);
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
builder.AddListeningPort(port, grpc::InsecureServerCredentials());
builder.RegisterService(this);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << port << std::endl;
server->Wait();
}
`sundial_sync_client.h'
#endif //SUNDIAL_GRPC_CLIENT_H
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
//toDo: now assume we only have 2 nodes
#ifndef SSC
#define SSC
class Sundial_Sync_Client{
public:
Sundial_Sync_Client(std::shared_ptr<Channel>* channel);
Status contactRemote(uint64_t node_id,SundialRequest& request, SundialResponse* response);
private:
std::unique_ptr<Sundial_GRPC_SYNC::Stub> stub_;
};
#endif
sundial_sync_client.cpp
#include "sundial_grpc.grpc.pb.h"
#include "sundial_grpc.pb.h"
#include "grpc_sync_client.h"
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "txn.h"
#include "global.h"
#include "helper.h"
#include "manager.h"
#include "stats.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using sundial_rpc::SundialRequest;
using sundial_rpc::SundialResponse;
using sundial_rpc::Sundial_GRPC_SYNC;
//toDo: add more nodes to it
Sundial_Sync_Client::Sundial_Sync_Client(std::shared_ptr<Channel>* channel){
for(int i=0; i<g_num_nodes;i++){
if(i==g_node_id)
continue;
//stub_[i]=Sundial_GRPC_SYNC::NewStub(channel[i]);
stub_=Sundial_GRPC_SYNC::NewStub(channel[i]);
}
}
Status
Sundial_Sync_Client::contactRemote(uint64_t node_id, SundialRequest& request, SundialResponse* response){
//toDo: choose the right stub with node id
ClientContext context;
printf("Client sends request\n");
//Status status = stub_[node_id]->contactRemote(&context, request, &response);
Status status = stub_->contactRemote(&context, request, response);
if (status.ok()) {
//printf("status ok\n");
glob_stats->_stats[GET_THD_ID]->_resp_msg_count[ response->response_type() ] ++;
glob_stats->_stats[GET_THD_ID]->_resp_msg_size[ response->response_type() ] += response->SpaceUsedLong();
return status;
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return status;
}
}
It runs pretty smoothly at the setup. I did get the ip address and let servers run at the right address. However, when I make the client send request, the status it returns always has error code 12 without error message.
Error code 12 is "UNIMPLEMENTED". Check your proto and make sure your server is implementing the correct RPC method (correct upper/lower case for method name and things like that)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With