Here is my code with the extraneous stuff stripped out:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
worker.py
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(
(os.getpid(), True)
)
what happens when I run it:
process_id, val = socket.recv_json()
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
return jsonapi.loads(msg)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
return jsonmod.loads(s, **kwargs)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
return _default_decoder.decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
obj, end = self.raw_decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)
and if I dig in with ipdb:
> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
379 msg = self.recv(flags)
--> 380 return jsonapi.loads(msg)
381
ipdb> p msg
'\x00\x9f\xd9\x06\xa2'
hmm, that doesn't look like JSON... is this a bug in pyzmq? am I using it wrong?
Hmm, ok, found the answer.
There is an annoying asymmetry in the ØMQ interface, so you have to be aware of the type of socket you are using.
In this case my use of ROUTER/DEALER architecture means that the JSON message sent from the DEALER socket, when I do send_json
, gets wrapped in multipart message envelope. The first part is a client id (I guess this is the '\x00\x9f\xd9\x06\xa2'
that I got above) and the second part is the JSON string we are interested in.
So in the last line of my coordinator.py I need to do this instead:
id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)
IMHO this is bad design on the part of ØMQ/pyzmq, the library should abstract this away and have just send
and recv
methods, that just work.
I got the clue from this question How can I use send_json with pyzmq PUB SUB so it looks like PUB/SUB architecture has the same issue, and no doubt others too.
This is described in the docs but it's not very clear
http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern
Update
In fact, I found in my case I could simplify the code further, by making use of the 'client id' part of the message envelope directly. So the worker just does:
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid()) # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(True)
It's also worth noting that when you want to send a message the other direction, from the ROUTER, you have to send it as multipart, specifying which client it is destined for, eg:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
pids = set()
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
pids.add(process_id)
# need some code in here to decide when to stop listening
# and break the loop
for pid in pids:
socket.send_multipart([pid, 'a string message'])
# ^ do your own json encoding if required
I guess there is probably some ØMQ way of doing a broadcast message rather than sending to each client in a loop as I do above. I wish the docs just had a clear description of each available socket type and how to use them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With