I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:
Edit 1: Minimized example
Edit 2: Included solution, see answer down for how.
import asyncio
import zmq.asyncio
from zmq.asyncio import Context
# manages message flow between publishers and subscribers
class HelloWorldMessage:
def __init__(self, url='127.0.0.1', port='5555'):
self.url = "tcp://{}:{}".format(url, port)
self.ctx = Context.instance()
# activate publishers / subscribers
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.pub_hello_world(),
self.sub_hello_world(),
]))
# generates message "Hello World" and publish to topic 'world'
async def pub_hello_world(self):
pub = self.ctx.socket(zmq.PUB)
pub.connect(self.url)
# message contents
msg = "Hello World"
print(msg)
# keep sending messages
while True:
# --MOVED-- slow down message publication
await asyncio.sleep(1)
# publish message to topic 'world'
# async always needs `send_multipart()`
await pub.send_multipart([b'world', msg.encode('ascii')]) # WRONG: bytes(msg)
# processes message "Hello World" from topic 'world'
async def sub_hello_world(self):
sub = self.ctx.socket(zmq.SUB)
sub.bind(self.url)
sub.setsockopt(zmq.SUBSCRIBE, b'world')
# keep listening to all published message on topic 'world'
while True:
msg = await sub.recv_multipart()
# ERROR: WAITS FOREVER
print('received: ', msg)
if __name__ == '__main__':
HelloWorldMessage()
With the above code only 1 Hello World
is printed and then waits forever. If I press ctrl+c, I get the following error:
python helloworld_pubsub.py
Hello World
^CTraceback (most recent call last):
File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
HelloWorldMessage()
File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
self.sub_hello_world(),
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
event_list = self._selector.select(timeout)
File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Versions: libzmq: 4.2.3
, pyzmq: 17.0.0
, Ubuntu 16.04
Any insights are appreciated.
There were 2 errors with my code:
PUB/SUB
communication archetype needs some time for
initialization (see his/her answer). I had to move await asyncio.sleep(1)
above the code of publishing (await pub.send_multipart([b'world', msg.encode('ascii')])
)bytes(msg)
--> msg.encode('ascii')
This answer is most closely related to my question, but please look at @user3666197 for certain design choices when implementing PyZMQ.
It seems that PyZMQ in an asyncio.get_event_loop()
doesn't give an error traceback, therefore, wrap your code in a try
& except
block, e.g.:
import traceback
import logging
try:
while True:
msg_received = await sub.recv_multipart()
# do other stuff
except Exception as e:
print("Error with sub world")
logging.error(traceback.format_exc())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With