Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ZeroMQ IPC across several instances of a program

I am having some problems with inter process communication in ZMQ between several instances of a program

  • I am using Linux OS
  • I am using zeromq/cppzmq, header-only C++ binding for libzmq

If I run two instances of this application (say on a terminal), I provide one with an argument to be a listener, then providing the other with an argument to be a sender. The listener never receives a message. I have tried TCP and IPC to no avail.

#include <zmq.hpp>
#include <string>
#include <iostream>

int ListenMessage();
int SendMessage(std::string str);

zmq::context_t global_zmq_context(1);

int main(int argc, char* argv[] ) {
    std::string str = "Hello World";
    if (atoi(argv[1]) == 0) ListenMessage();
    else SendMessage(str);

    zmq_ctx_destroy(& global_zmq_context);
    return 0;
}


int SendMessage(std::string str) {
    assert(global_zmq_context);
    std::cout << "Sending \n";
    zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
    assert(publisher);

    int linger = 0;
    int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: connect failed: %s\n", strerror (errno));
        return -1;
    }

    zmq::message_t message(static_cast<const void*> (str.data()), str.size());
    rc = publisher.send(message);
    if (rc == -1) {
        printf ("E: send failed: %s\n", strerror (errno));
        return -1;
    }
    return 0;
}

int ListenMessage() {
    assert(global_zmq_context);
    std::cout << "Listening \n";
    zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
    assert(subscriber);

    int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc==0);

    int linger = 0;
    rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: bind failed: %s\n", strerror (errno));
        return -1;
    }

    std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
    while (true) {
        zmq::message_t rx_msg;
        // when timeout (the third argument here) is -1,
        // then block until ready to receive
        std::cout << "Still Listening before poll \n";
        zmq::poll(p.data(), 1, -1);
        std::cout << "Found an item \n"; // not reaching
        if (p[0].revents & ZMQ_POLLIN) {
            // received something on the first (only) socket
            subscriber.recv(&rx_msg);
            std::string rx_str;
            rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
            std::cout << "Received: " << rx_str << std::endl;
        }
    }
    return 0;
}

This code will work if I running one instance of the program with two threads

    std::thread t_sub(ListenMessage);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(SendMessage str);
    t_pub.join();
    t_sub.join();

But I am wondering why when running two instances of the program the code above won't work?

Thanks for your help!

like image 449
Alain Daccache Avatar asked Oct 16 '22 06:10

Alain Daccache


1 Answers

In case one has never worked with ZeroMQ,
one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"
before diving into further details


Q : wondering why when running two instances of the program the code above won't work?

This code will never fly - and it has nothing to do with thread-based nor the process-based [CONCURENT] processing.

It was caused by a wrong design of the Inter Process Communication.

ZeroMQ can provide for this either one of the supported transport-classes :
{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// } plus having even smarter one for in-process comms, an inproc:// transport-class ready for inter-thread comms, where a stack-less communication may enjoy the lowest ever latency, being just a memory-mapped policy.

The selection of L3/L2-based networking stack for an Inter-Process-Communication is possible, yet sort of the most "expensive" option.


The Core Mistake :

Given that choice, any single processes ( not speaking about a pair of processes ) will collide on an attempt to .bind() its AccessPoint onto the very same TCP/IP-address:port#


The Other Mistake :

Even for the sake of a solo programme launched, both of the spawned threads attempt to .bind() its private AccessPoint, yet none does an attempt to .connect() a matching "opposite" AccessPoint.

At least one has to successfully .bind(), and
at least one has to successfully .connect(), so as to get a "channel", here of the PUB/SUB Archetype.


ToDo:

  • decide about a proper, right-enough Transport-Class ( best avoid an overkill to operate the full L3/L2-stack for localhost/in-process IPC )
  • refactor the Address:port# management ( for 2+ processes not to fail on .bind()-(s) to the same ( hard-wired ) address:port#
  • always detect and handle appropriately the returned {PASS|FAIL}-s from API calls
  • always set LINGER to zero explicitly ( you never know )
like image 138
user3666197 Avatar answered Oct 21 '22 05:10

user3666197