Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++ and Python ZeroMQ 4.x PUB/SUB example does not work

I can only find old C++ source examples. Anyways, I did mine, based on them. Here's my publisher in python:

import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")

while True:
    msg = "hello"
    socket.send_string(msg)
    print("sent "+ msg)
    sleep(5)

And here's the subscriber in C++:

void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
   zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));

while (true) {
    zmq_msg_t msg;
    int rc;

    rc = zmq_msg_init( & msg);
    assert(rc == 0);
    std::cout << "waiting for message..." << std::endl;
    rc = zmq_msg_recv( & msg, subscriber, 0);
    assert(rc == 1);

    std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
    zmq_msg_close( & msg);
}

Initially, I tried zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") ); but I guess I should receive everything if I don't set this, right? So I commented it.

When I run the code, I see "waiting for message..." forever.

I tried to listen to TCP traffic using tcpdump. Turns out that when the publisher is turned on, I see a lot of garbage on the 5563 port, and when I turn the publisher off, they stop. When I tried a PUSH/PULL scheme, I could see the plaintext message in tcpdump. (I tried pushing with nodejs and pulling with c++ and it worked).

What could I be doing wrong?

I tried different combinations of .bind(), .connect(), localhost, 127.0.0.1, but they won't work either.

UPDATE: I've just read that I must subscribe to something, so I did zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 ); to subscribe to everything but I still receive nothing

PyZMQ is in version 17.0.0.b3 and has ZeroMQ 4.2.3

C++ has ZeroMQ 4.2.2

UPDATE 2:

Updates both to 4.2.3, won't work either.

like image 468
PPP Avatar asked Dec 25 '17 20:12

PPP


2 Answers

"I guess I should receive everything if I don't set this, right?"

No, this is not a correct assumption. You may like a collection of my other ZeroMQ posts here, about a { plain-string | unicode | serialisation }-issues and the { performance- | traffic- }-impacts actual policy ( SUB-side topic-filter processing on early ZeroMQ versions, and/or the PUB-side processing for more recent ones ) one may encounter in heterogeneous distributed-systems' design, using ZeroMQ.

( Any other Scalable Formal Communication Archetype Pattern, like the observed PUSH/PULL, does nothing with the subscription policy, so will work independently of the subscription-matching processing against a set topic-filter list. )


Step 0: Test the sending-part RTO first, if it .send()-s anything at all:

Let's mock-up a fast pythonic receiver, to see, if the sender indeed sends anything down the lane:

import zmq

aContext = zmq.Context()                                          # .new Context
aSUB     = aContext.socket( zmq.SUB )                             # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" )                            # .connect
aSUB.setsockopt( zmq.LINGER, 0 )                                  # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" )                              # .set T-filter

MASK = "INF: .recv()-ed this:[{0:}]\n:     waited {1: > 7d} [us]"
aClk = zmq.Stopwatch(); 

while True:
      try:
           aClk.start(); print MASK.format( aSUB.recv(),
                                            aClk.stop()
                                            )
      except ( KeyboardInterrupt, SystemExit ):
           pass
           break
pass
aSUB.close()                                                     # .close ALWAYS!
aContext.term()                                                  # .term  ALWAYS!

This ought report whatever the PUB-sender is actually .send()-ing over the wire and also the actual message inter-arrival times ( in [us], glad the ZeroMQ has included this tool for debugging and performance / latency tweaking ).

If ACK-ed as you see the live INF:-messages actually ticking on screen, keep it running and it now makes sense to proceed to the next step.


Step 1: Test the receiving-part code next:

#include <zmq.h>
void *aContext = zmq_ctx_new();        
void *aSUB     = zmq_socket(     aContext, ZMQ_SUB );               std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
                 zmq_connect(    aSUB,    "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
                 zmq_setsockopt( aSUB,     ZMQ_SUBSCRIBE, "", 0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
                 zmq_setsockopt( aSUB,     ZMQ_LINGER,        0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ...    ) done" << std::endl;
int rc;
while (true) {
         zmq_msg_t       msg;                            /*       Create an empty ØMQ message */
    rc = zmq_msg_init  (&msg);          assert (rc ==  0 && "EXC: in zmq_msg_init() call" );
                                               std::cout << "INF: .. zmq_msg_init() done" << std::endl;
    rc = zmq_msg_recv  (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
                                               std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
         zmq_msg_close (&msg);                           /*       Release message */
                                               std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close(    aSUB );                          std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext );                      std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
like image 176
user3666197 Avatar answered Oct 07 '22 05:10

user3666197


What is the return value for zmq_setsockopt()?

Then you should use "" instead of a NULL, they are different.

zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );

As API defines:

Return value

The zmq_setsockopt() function shall return zero if successful. Otherwise it shall return -1 and set errno to one of the values defined below.
...

like image 39
Sphinx Avatar answered Oct 07 '22 07:10

Sphinx