Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

gRPC: What are the best practices for long-running streaming?

We've implemented a Java gRPC service that runs in the cloud, with an unidirectional (client to server) streaming RPC which looks like:

rpc PushUpdates(stream Update) returns (Ack);

A C++ client (a mobile device) calls this rpc as soon as it boots up, to continuously send an update every 30 or so seconds, perpetually as long as the device is up and running.

ChannelArguments chan_args;
// this will be secure channel eventually 
auto channel_p = CreateCustomChannel(remote_addr, InsecureChannelCredentials(), chan_args);
auto stub_p    = DialTcc::NewStub(channel_p);
// ...

Ack ack;
auto strm_ctxt_p = make_unique<ClientContext>();
auto strm_p      = stub_p->PushUpdates(strm_ctxt_p.get(), &ack);
// ...

While(true) {
    // wait until we are ready to send a new update
    Update updt;
    // populate updt;
    if(!strm_p->Write(updt)) {
        // stream is not kosher, create a new one and restart
        break;
    }
}

Now different kinds of network interruptions happen while this is happening:

  • the gRPC service running in the cloud may go down (for maintenance) or may simply become unreachable.
  • the device's own ip address keeps changing as it is a mobile device.

We've seen that on such events, neither the channel, nor the Write() API is able to detect network disconnection reliably. At times the client keep calling Write() (which doesn't return false) but the server doesn't receive any data (wireshark doesn't show any activity at the outgoing port of the client device).

What are the best practices to recover in such cases, so that the server starts receiving the updates within X seconds from the time when such an event occurs? It is understandable that there would loss of X seconds worth data whenever such an event happens, but we want to recover reliably within X seconds.

gRPC version: 1.30.2, Client: C++-14/Linux, Sever: Java/Linux

like image 678
Curious Avatar asked Oct 28 '25 13:10

Curious


1 Answers

Here's how we've hacked this. I want to check if this can be made any better or anyone from gRPC can guide me about a better solution.

The protobuf for our service looks like this. It has an RPC for pinging the service, which is used frequently to test connectivity.

// Message used in IsAlive RPC
message Empty {}

// Acknowledgement sent by the service for updates received
message UpdateAck {}

// Messages streamed to the service by the client
message Update {
...
...
}

service GrpcService {
  // for checking if we're able to connect
  rpc Ping(Empty) returns (Empty); 

  // streaming RPC for pushing updates by client
  rpc PushUpdate(stream Update) returns (UpdateAck);
}

Here is how the c++ client looks, which does the following:

  • Connect():

    • Create the stub for calling the RPCs, if the stub is nullptr.
    • Call Ping() in regular intervals until it is successful.
    • On success call PushUpdate(...) RPC to create a new stream.
    • On failure reset the stream to nullptr.
  • Stream(): Do the following a while(true) loop:

    • Get the update to be pushed.
    • Call Write(...) on the stream with the update to be pushed.
    • If Write(...) fails for any reason break and the control goes back to Connect().
    • Once in every 30 minutes (or some regular interval), reset everything (stub, channel, stream) to nullptr to start afresh. This is required because at times Write(...) does not fail even if there is no connection between the client and the service. Write(...) calls are successful but the outgoing port on the client does not show any activity on wireshark!

Here is the code:

constexpr GRPC_TIMEOUT_S = 10;
constexpr RESTART_INTERVAL_M = 15;
constexpr GRPC_KEEPALIVE_TIME_MS = 10000;
string root_ca, tls_key, tls_cert; // for SSL
string remote_addr = "https://remote.com:5445";
...
...
void ResetStreaming() {
  if (stub_p) {
    if (strm_p) {  // graceful restart/stop, this pair of API are called together, in this order  
      if (!strm_p->WritesDone()) {                                                                           
        // Log a message                                 
      }                                                                                                        
      strm_p->Finish(); // Log if return value of this is NOT grpc::OK           
    }   
    strm_p      = nullptr;
    strm_ctxt_p = nullptr;
    stub_p      = nullptr;
    channel_p   = nullptr;
  }
}

void CreateStub() {
  if (!stub_p) {
    ChannelArguments chan_args;
    chan_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, GRPC_KEEPALIVE_TIME_MS);
    channel_p = CreateCustomChannel(
        remote_addr,
        SslCredentials(SslCredentialsOptions{root_ca, tls_key, tls_cert}),
        chan_args);
    stub_p = GrpcService::NewStub(m_channel_p);
  }
}

void Stream() {
  const auto restart_time = steady_clock::now() + minutes(RESTART_INTERVAL_M);
  while (!stop) {
    // restart every RESTART_INTERVAL_M (15m) even if ALL IS WELL!!
    if (steady_clock::now() > restart_time) {
      break;
    }
    Update updt = GetUpdate(); // get the update to be sent
    if (!stop) {
      if (channel_p->GetState(true) == GRPC_CHANNEL_SHUTDOWN ||
                 !strm_p->Write(updt)) {
        // could not write!!
        return;  // we will Connect() again
      }
    }
  }
  // stopped due to stop = true or interval to create new stream has expired
  ResetStreaming();  // channel, stub, stream are recreated once in every 15m
}

bool PingRemote() {
  ClientContext ctxt;
  ctxt.set_deadline(system_clock::now() + seconds(GRPC_TIMEOUT_S));
  Empty req, resp;
  CreateStub();
  if (stub_p->Ping(&ctxt, req, &resp).ok()) {
    static UpdateAck ack;
    strm_ctxt_p = make_unique<ClientContext>();  // need new context
    strm_p      = stub_p->PushUpdate(strm_ctxt_p.get(), &ack);
    return true;
  }
  if (strm_p) {
    strm_p      = nullptr;
    strm_ctxt_p = nullptr;
  }
  return false;
}

void Connect() {
  while (!stop) {
    if (PingRemote() || stop) {
      break;
    }
    sleep_for(seconds(5)); // wait before retrying
  }
}

// set to true from another thread when we want to stop
atomic<bool> stop = false;

void StreamUntilStopped() {
  if (stop) {
    return;
  }
  strm_thread_p = make_unique<thread>([&] {
    while (!stop) {
      Connect();
      Stream();
    }
  });
}

// called by the thread that sets stop = true
void Finish() {
  strm_thread_p->join();
}

With this we are seeing that the streaming recovers within 15 minutes (or RESTART_INTERVAL_M) whenever there is a disruption for any reason. This code runs in a fast path, so I am curious to know if this can be made any better.

like image 190
Curious Avatar answered Oct 30 '25 03:10

Curious