I can only find old C++ source examples. Anyways, I did mine, based on them. Here's my publisher in python:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")
while True:
msg = "hello"
socket.send_string(msg)
print("sent "+ msg)
sleep(5)
And here's the subscriber in C++:
void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));
while (true) {
zmq_msg_t msg;
int rc;
rc = zmq_msg_init( & msg);
assert(rc == 0);
std::cout << "waiting for message..." << std::endl;
rc = zmq_msg_recv( & msg, subscriber, 0);
assert(rc == 1);
std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
zmq_msg_close( & msg);
}
Initially, I tried zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") );
but I guess I should receive everything if I don't set this, right? So I commented it.
When I run the code, I see "waiting for message..." forever.
I tried to listen to TCP traffic using tcpdump
. Turns out that when the publisher is turned on, I see a lot of garbage on the 5563
port, and when I turn the publisher off, they stop. When I tried a PUSH/PULL
scheme, I could see the plaintext message in tcpdump
. (I tried pushing with nodejs and pulling with c++ and it worked).
What could I be doing wrong?
I tried different combinations of .bind()
, .connect()
, localhost
, 127.0.0.1
, but they won't work either.
UPDATE: I've just read that I must subscribe to something, so I did zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 );
to subscribe to everything but I still receive nothing
PyZMQ is in version 17.0.0.b3 and has ZeroMQ 4.2.3
C++ has ZeroMQ 4.2.2
UPDATE 2:
Updates both to 4.2.3, won't work either.
"I guess I should receive everything if I don't set this, right?"
No, this is not a correct assumption. You may like a collection of my other ZeroMQ posts here, about a { plain-string | unicode | serialisation }-issues and the { performance- | traffic- }-impacts actual policy ( SUB
-side topic-filter processing on early ZeroMQ versions, and/or the PUB
-side processing for more recent ones ) one may encounter in heterogeneous distributed-systems' design, using ZeroMQ.
( Any other Scalable Formal Communication Archetype Pattern, like the observed PUSH/PULL
, does nothing with the subscription policy, so will work independently of the subscription-matching processing against a set topic-filter list. )
.send()
-s anything at all:Let's mock-up a fast pythonic receiver, to see, if the sender indeed sends anything down the lane:
import zmq
aContext = zmq.Context() # .new Context
aSUB = aContext.socket( zmq.SUB ) # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" ) # .connect
aSUB.setsockopt( zmq.LINGER, 0 ) # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # .set T-filter
MASK = "INF: .recv()-ed this:[{0:}]\n: waited {1: > 7d} [us]"
aClk = zmq.Stopwatch();
while True:
try:
aClk.start(); print MASK.format( aSUB.recv(),
aClk.stop()
)
except ( KeyboardInterrupt, SystemExit ):
pass
break
pass
aSUB.close() # .close ALWAYS!
aContext.term() # .term ALWAYS!
This ought report whatever the PUB
-sender is actually .send()
-ing over the wire and also the actual message inter-arrival times ( in [us]
, glad the ZeroMQ has included this tool for debugging and performance / latency tweaking ).
If ACK-ed as you see the live INF:
-messages actually ticking on screen, keep it running and it now makes sense to proceed to the next step.
#include <zmq.h>
void *aContext = zmq_ctx_new();
void *aSUB = zmq_socket( aContext, ZMQ_SUB ); std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
zmq_connect( aSUB, "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_SUBSCRIBE, "", 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_LINGER, 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ... ) done" << std::endl;
int rc;
while (true) {
zmq_msg_t msg; /* Create an empty ØMQ message */
rc = zmq_msg_init (&msg); assert (rc == 0 && "EXC: in zmq_msg_init() call" );
std::cout << "INF: .. zmq_msg_init() done" << std::endl;
rc = zmq_msg_recv (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
zmq_msg_close (&msg); /* Release message */
std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close( aSUB ); std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext ); std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
What is the return value for zmq_setsockopt()
?
Then you should use ""
instead of a NULL
, they are different.
zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );
As API defines:
Return value
The
zmq_setsockopt()
function shall return zero if successful. Otherwise it shall return -1 and seterrno
to one of the values defined below.
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With