I am having some problems with inter process communication in ZMQ between several instances of a program
If I run two instances of this application (say on a terminal), I provide one with an argument to be a listener, then providing the other with an argument to be a sender. The listener never receives a message. I have tried TCP and IPC to no avail.
#include <zmq.hpp>
#include <string>
#include <iostream>
int ListenMessage();
int SendMessage(std::string str);
zmq::context_t global_zmq_context(1);
int main(int argc, char* argv[] ) {
std::string str = "Hello World";
if (atoi(argv[1]) == 0) ListenMessage();
else SendMessage(str);
zmq_ctx_destroy(& global_zmq_context);
return 0;
}
int SendMessage(std::string str) {
assert(global_zmq_context);
std::cout << "Sending \n";
zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
assert(publisher);
int linger = 0;
int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
assert(rc==0);
rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
if (rc == -1) {
printf ("E: connect failed: %s\n", strerror (errno));
return -1;
}
zmq::message_t message(static_cast<const void*> (str.data()), str.size());
rc = publisher.send(message);
if (rc == -1) {
printf ("E: send failed: %s\n", strerror (errno));
return -1;
}
return 0;
}
int ListenMessage() {
assert(global_zmq_context);
std::cout << "Listening \n";
zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
assert(subscriber);
int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
assert(rc==0);
int linger = 0;
rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
assert(rc==0);
rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
if (rc == -1) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
while (true) {
zmq::message_t rx_msg;
// when timeout (the third argument here) is -1,
// then block until ready to receive
std::cout << "Still Listening before poll \n";
zmq::poll(p.data(), 1, -1);
std::cout << "Found an item \n"; // not reaching
if (p[0].revents & ZMQ_POLLIN) {
// received something on the first (only) socket
subscriber.recv(&rx_msg);
std::string rx_str;
rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
std::cout << "Received: " << rx_str << std::endl;
}
}
return 0;
}
This code will work if I running one instance of the program with two threads
std::thread t_sub(ListenMessage);
sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
std::thread t_pub(SendMessage str);
t_pub.join();
t_sub.join();
But I am wondering why when running two instances of the program the code above won't work?
Thanks for your help!
In case one has never worked with ZeroMQ,
one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"
before diving into further details
Q : wondering why when running two instances of the program the code above won't work?
This code will never fly - and it has nothing to do with thread
-based nor the process
-based [CONCURENT]
processing.
It was caused by a wrong design of the Inter Process Communication.
ZeroMQ can provide for this either one of the supported transport-classes :{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// }
plus having even smarter one for in-process comms, an inproc://
transport-class ready for inter-thread comms, where a stack-less communication may enjoy the lowest ever latency, being just a memory-mapped policy.
The selection of L3/L2-based networking stack for an Inter-Process-Communication is possible, yet sort of the most "expensive" option.
Given that choice, any single processes ( not speaking about a pair of processes ) will collide on an attempt to .bind()
its AccessPoint onto the very same TCP/IP-address:port#
Even for the sake of a solo programme launched, both of the spawned threads attempt to .bind()
its private AccessPoint, yet none does an attempt to .connect()
a matching "opposite" AccessPoint.
At least one has to successfully .bind()
, and
at least one has to successfully .connect()
, so as to get a "channel", here of the PUB/SUB
Archetype.
Address:port#
management ( for 2+ processes not to fail on .bind()
-(s) to the same ( hard-wired ) address:port#
{PASS|FAIL}
-s from API callsLINGER
to zero explicitly ( you never know )If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With