Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

zero mq pub/sub with multipart not working

Here's my script.


#!/usr/bin/env python

import traceback
import sys
import zmq
from time import sleep

print "Creating the zmq.Context"
context = zmq.Context()

print "Binding the publisher to the local socket at port 5557"
sender = context.socket(zmq.PUB)
sender.bind("tcp://*:5557")

print "Binding the subscriber to the local socket at port 5557"
receiver = context.socket(zmq.SUB)
receiver.connect("tcp://*:5557")

print "Setting the subscriber option to get only those originating from \"B\""
receiver.setsockopt(zmq.SUBSCRIBE, "B")

print "Waiting a second for the socket to be created."
sleep(1)

print "Sending messages"
for i in range(1,10):
    msg = "msg %d" % (i)
    env = None
    if i % 2 == 0:
        env = ["B", msg]
    else:
        env = ["A", msg]
    print "Sending Message:  ", env
    sender.send_multipart(env)

print "Closing the sender."
sender.close()

failed_attempts = 0
while failed_attempts < 3:
    try:
        print str(receiver.recv_multipart(zmq.NOBLOCK))
    except:
        print traceback.format_exception(*sys.exc_info())
        failed_attempts += 1 

print "Closing the receiver."
receiver.close()

print "Terminating the context."
context.term()

"""
Output:

Creating the zmq.Context
Binding the publisher to the local socket at port 5557
Binding the subscriber to the local socket at port 5557
Setting the subscriber option to get only those originating from "B"
Waiting a second for the socket to be created.
Sending messages
Sending Message:   ['A', 'msg 1']
Sending Message:   ['B', 'msg 2']
Sending Message:   ['A', 'msg 3']
Sending Message:   ['B', 'msg 4']
Sending Message:   ['A', 'msg 5']
Sending Message:   ['B', 'msg 6']
Sending Message:   ['A', 'msg 7']
Sending Message:   ['B', 'msg 8']
Sending Message:   ['A', 'msg 9']
Closing the sender.
['B', 'msg 2']
['B', 'msg 4']
['B', 'msg 6']
['B', 'msg 8']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
Closing the receiver.
Terminating the context.
"""

And, the question is... why doesn't this code work?

[EDIT] After getting a super quick response on the zeromq mailing list, I've updated the code above.

like image 803
bitcycle Avatar asked Jun 05 '11 19:06

bitcycle


2 Answers

Credit: Chuck Remes

You may need a "sleep" between the socket creation steps (bind, connect, setsockopt) and the actual transmission of the messages. The bind & connect operations are asynchronous, so they may not complete by the time you get to the logic that sends all of the messages. In that case, any messages sent through the PUB socket will be dropped since a zmq_bind() operation does not create a queue until another socket has successfully connected to it.

As a side note, you don't need to create 2 contexts in this example. Both sockets can be created within the same context. It doesn't hurt, but it also isn't necessary.

Credit: Pieter

There is a "problem solver" at the end of Ch1 that explains this.

Some socket types (ROUTER and PUB) will silently drop messages for which they have no recipients. Connecting is, as Chuck said, asynchronous and takes approx 100msec. If you start two threads, bind one side, connect the other, and then start immediately to send data over such a socket type, you'll lose the first 100msec of data (approximately).

Doing a sleep is a brutal "prove that it works" option. Realistically you'd synchronize in some way, or (more typically) expect message loss as part of normal startup (i.e. see the published data as a pure broadcast with no definite start or end).

See weather update example, syncpub and syncsub for details.

like image 121
bitcycle Avatar answered Sep 19 '22 12:09

bitcycle


Necro-posting, but for those interested in a solution other than sleeping, there are monitors.

You can set a monitor callback and get called on ZMQ_EVENT_CONNECTED events.

See details and example at http://api.zeromq.org/3-3:zmq-ctx-set-monitor.

like image 24
Bill Barnhill Avatar answered Sep 18 '22 12:09

Bill Barnhill