Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Howto make zeromq PUB/SUB drop old messages instead of new (for realtime feeds)?

Tags:

zeromq

Say I have a PUB server that zmq_send()'s realtime messages to SUB client. If client is busy and can not zmq_recv() messages quick enough, then messages will be buffered in client (and/or server).

If the buffer grows too large (high water mark) then NEW messages will be dropped. For realtime messages this is the opposite of what one wants. OLD messages should be dropped to make place for NEW ones.

Is there some way to do this?

Ideally I would like the SUB client's receive queue to be either empty or contain the most recent message only. When a new message is received it would replace the old one. ( I guess the problem here would be that the client would block on zmq_recv() when the queue is empty, wasting time doing so. )

So how are realtime feeds usually implemented in ZeroMQ?

like image 405
petke Avatar asked Dec 29 '15 01:12

petke


People also ask

Does ZeroMQ guarantee delivery?

ZeroMQ guarantees to deliver all the parts (one or more) for a message, or none of them. This allows you to send or receive a list of frames as a single on-the-wire message. A message (single or multipart) must fit in memory.

What is ZeroMQ used for?

ZeroMQ is a library used to implement messaging and communication systems between applications and processes - fast and asynchronously.


1 Answers

I'll answer my own question here. The setting ZMQ_CONFLATE "Keep only last message" seemed promising but it doesn't work with subscription filters. It only ever keeps one message in the queue. If you have more than one filter, both old and new messages of the other filters type gets thrown away.

Likewise the recommendation of the zeromq guide to simply to kill slow subscribers, but that doesn't seem like realistic solution. Having subscribers with different read speeds, subscribed to the same fast publisher, should be a normal use case. Some of these subscribers might live on slow computers others on fast ones, etc. ZeroMQ should be able to handle that somehow.

http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern

I ended up doing manual dropping of old queued up messages on the client side. It seems to work fine. I get subscribed messages to the client that are less than 3ms old (through tcp localhost) that way. This works even in cases where I have five thousand, 10 second old messages, in the queue in front of those few real-time message at the back. This is good enough for me.

I cant help but think this is something that should be provided by the library. It could probably do a better job of it.

Anyways here is the client side, old message dropping, code:

bool Empty(zmq::socket_t& socket) {
    bool ret = true;
    zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 };
    zmq::poll(&poll_item, 1, 0); //0 = no wait
    if (poll_item.revents & ZMQ_POLLIN) {
        ret = false;
    }
    return ret;
}

std::vector<std::string> GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms)
{
    std::vector<std::string> ret;

    struct MessageTmp {
        int id_ = 0;
        std::string data_;
        boost::posix_time::ptime timestamp_;
    };

    std::map<int, MessageTmp> msg_map;

    int read_msg_count = 0;
    int time_in_loop = 0;
    auto start_of_loop = boost::posix_time::microsec_clock::universal_time();
    do {
        read_msg_count++;

        //msg format sent by publisher is: filter, timestamp, data
        MessageTmp msg;
        msg.id_ = boost::lexical_cast<int>(s_recv(socket_sub));
        msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub));
        msg.data_ = s_recv(socket_sub);

        msg_map[msg.id_] = msg;

        auto now = boost::posix_time::microsec_clock::universal_time();
        time_in_loop = (now - start_of_loop).total_milliseconds();
        if (time_in_loop > timeout_ms) {
            std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl;
            break;
        }
    } while ((Empty(socket_sub) == false)); 

    if (read_msg_count > 1) {
        std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl;
    }

    for (const auto &pair: msg_map) {
        const auto& msg_tmp = pair.second;

        auto now = boost::posix_time::microsec_clock::universal_time();
        auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds();

        if (message_age_ms > timeout_ms) {
            std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
        }
        else {
            std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
            ret.push_back(msg_tmp.data_);
        }
    }

    return ret;
}
like image 195
petke Avatar answered Oct 12 '22 03:10

petke