Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World

I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:

Edit 1: Minimized example

Edit 2: Included solution, see answer down for how.

import asyncio
import zmq.asyncio
from zmq.asyncio import Context

# manages message flow between publishers and subscribers
class HelloWorldMessage:
    def __init__(self, url='127.0.0.1', port='5555'):
        self.url = "tcp://{}:{}".format(url, port)
        self.ctx = Context.instance()

        # activate publishers / subscribers
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.pub_hello_world(),
            self.sub_hello_world(),
        ]))

    # generates message "Hello World" and publish to topic 'world'
    async def pub_hello_world(self):
        pub = self.ctx.socket(zmq.PUB)
        pub.connect(self.url)

        # message contents
        msg = "Hello World"
        print(msg)

        # keep sending messages
        while True:
            # --MOVED-- slow down message publication
            await asyncio.sleep(1) 

            # publish message to topic 'world'
            # async always needs `send_multipart()`
            await pub.send_multipart([b'world', msg.encode('ascii')])  # WRONG: bytes(msg)

    # processes message "Hello World" from topic 'world'
    async def sub_hello_world(self):
        sub = self.ctx.socket(zmq.SUB)
        sub.bind(self.url)
        sub.setsockopt(zmq.SUBSCRIBE, b'world')

        # keep listening to all published message on topic 'world'
        while True:
            msg = await sub.recv_multipart()
            # ERROR: WAITS FOREVER
            print('received: ', msg)

if __name__ == '__main__':
    HelloWorldMessage()

Problem

With the above code only 1 Hello World is printed and then waits forever. If I press ctrl+c, I get the following error:

python helloworld_pubsub.py

Hello World
^CTraceback (most recent call last):
  File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
    HelloWorldMessage()
  File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
    self.sub_hello_world(),
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
    event_list = self._selector.select(timeout)
  File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

Versions: libzmq: 4.2.3, pyzmq: 17.0.0, Ubuntu 16.04

Any insights are appreciated.

like image 410
NumesSanguis Avatar asked Nov 17 '22 17:11

NumesSanguis


1 Answers

There were 2 errors with my code:

  1. As mentioned by @user3666197, the PUB/SUB communication archetype needs some time for initialization (see his/her answer). I had to move await asyncio.sleep(1) above the code of publishing (await pub.send_multipart([b'world', msg.encode('ascii')]))
  2. I encoded the message wrong. bytes(msg) --> msg.encode('ascii')

This answer is most closely related to my question, but please look at @user3666197 for certain design choices when implementing PyZMQ.

Advice

It seems that PyZMQ in an asyncio.get_event_loop() doesn't give an error traceback, therefore, wrap your code in a try & except block, e.g.:

import traceback
import logging

try:
    while True:
        msg_received = await sub.recv_multipart()
        # do other stuff

except Exception as e:
    print("Error with sub world")
    logging.error(traceback.format_exc())
like image 117
NumesSanguis Avatar answered Dec 30 '22 03:12

NumesSanguis