Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyzmq recv_json can't decode message sent by send_json

Tags:

zeromq

pyzmq

Here is my code with the extraneous stuff stripped out:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()

worker.py

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(
    (os.getpid(), True)
)

what happens when I run it:

    process_id, val = socket.recv_json()
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
    return jsonapi.loads(msg)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
    return jsonmod.loads(s, **kwargs)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
    return _default_decoder.decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
    obj, end = self.raw_decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
    raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)

and if I dig in with ipdb:

> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
    379             msg = self.recv(flags)
--> 380             return jsonapi.loads(msg)
    381

ipdb> p msg
'\x00\x9f\xd9\x06\xa2'

hmm, that doesn't look like JSON... is this a bug in pyzmq? am I using it wrong?

like image 644
Anentropic Avatar asked Dec 12 '15 16:12

Anentropic


Video Answer


1 Answers

Hmm, ok, found the answer.

There is an annoying asymmetry in the ØMQ interface, so you have to be aware of the type of socket you are using.

In this case my use of ROUTER/DEALER architecture means that the JSON message sent from the DEALER socket, when I do send_json, gets wrapped in multipart message envelope. The first part is a client id (I guess this is the '\x00\x9f\xd9\x06\xa2' that I got above) and the second part is the JSON string we are interested in.

So in the last line of my coordinator.py I need to do this instead:

id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)

IMHO this is bad design on the part of ØMQ/pyzmq, the library should abstract this away and have just send and recv methods, that just work.

I got the clue from this question How can I use send_json with pyzmq PUB SUB so it looks like PUB/SUB architecture has the same issue, and no doubt others too.

This is described in the docs but it's not very clear
http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern

Update

In fact, I found in my case I could simplify the code further, by making use of the 'client id' part of the message envelope directly. So the worker just does:

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid())  # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(True)

It's also worth noting that when you want to send a message the other direction, from the ROUTER, you have to send it as multipart, specifying which client it is destined for, eg:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

pids = set()
while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()
    pids.add(process_id)

    # need some code in here to decide when to stop listening
    # and break the loop

for pid in pids:
    socket.send_multipart([pid, 'a string message'])
    # ^ do your own json encoding if required

I guess there is probably some ØMQ way of doing a broadcast message rather than sending to each client in a loop as I do above. I wish the docs just had a clear description of each available socket type and how to use them.

like image 166
Anentropic Avatar answered Sep 17 '22 07:09

Anentropic