Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lazy pub/sub in zeromq, only get last message

I'am trying to implement a lazy subscriber on zeromq from the example wuclient/wuserver. The client is way slower than the server, so it must get only the last sent message by the server.

So far the only way i've found to do that, is by connecting/disconnecting the client, but there is of course an unwanted cost at each connection, around 3ms:

server.cxx

int main () {
    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    int counter = 0;
    while (1) {
        counter++;

        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
                  "%d", counter);
        publisher.send(message);
        std::cout     << counter <<  std::endl;
        usleep(100000);
      }
      return 0;
    }

client.cxx

int main (int argc, char *argv[])
{
  zmq::context_t context (1);
  zmq::socket_t subscriber (context, ZMQ_SUB);
  while(1){

    zmq::message_t update;
    int counter;

    subscriber.connect("tcp://localhost:5556"); // This call take some milliseconds
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 
    subscriber.recv(&update);
    subscriber.disconnect("tcp://localhost:5556");

    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> counter;

    std::cout     << counter <<  std::endl;
    usleep(1000000);
  }
  return 0;
}

Server output: 1 2 3 4 5 6 7 8 9 ...

Client output: 4 14 24 ...

I've tried to use high water mark to do that without co/deco, but it is not working. Even with this kind of code, frame begin to be dropped only when the buffer reach at least hundreds of messages. :

int high_water_mark = 1;
socket.setsockopt(ZMQ_RCVHWM, &high_water_mark, sizeof(high_water_mark) );
socket.setsockopt(ZMQ_SNDHWM, &high_water_mark, sizeof(high_water_mark) );

Also there is this post in zeromq-dev which is closely related, but the solution provided ( use of another thread to select the last message is not acceptable, I can't transfer tons of message over the network wich will not be used after.

like image 629
Mathieu Westphal Avatar asked Mar 18 '23 04:03

Mathieu Westphal


1 Answers

The solution is to use ZMQ_CONFLATE like this ( only with non multipart messages ):

client.cxx

#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <unistd.h>

int main (int argc, char *argv[])
{
  zmq::context_t context (1);

  zmq::socket_t subscriber (context, ZMQ_SUB);

  int conflate = 1;
  subscriber.setsockopt(ZMQ_CONFLATE, &conflate, sizeof(conflate) );
  subscriber.connect("tcp://localhost:5556");
  subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); 

  while(1){

    zmq::message_t update;
    int counter;

    subscriber.recv(&update);

    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> counter;

    std::cout     << counter <<  std::endl;
    usleep(1000000);

  }
  return 0;
}
like image 69
Mathieu Westphal Avatar answered Mar 20 '23 19:03

Mathieu Westphal