I'm pulling my hair out over this one. I'm trying to get the simplest of examples working with zeromq and gevent. I changed this script to use PUB/SUB sockets and when I run it the 'server' socket loops forever. If I uncomment the gevent.sleep(0.1) line then it works as expected and yields to the other green thread, which in this case is the client.
The problem is, why should I have to manually add a sleep call? I thought when I import the zmq.green version of zmq that the send and receive calls are non blocking and underneath do the task switching.
In other words, why should I have to add the gevent.sleep() call to get this example working? In Jeff Lindsey's original example, he's doing REQ/REP sockets and he doesn't need to add sleep calls...but when I changed this to PUB/SUB I need it there for this to yield to the client for processing.
#Notes: Code taken from slide: http://www.google.com/url?sa=t&rct=j&q=zeromq%20gevent&source=web&cd=27&ved=0CFsQFjAGOBQ&url=https%3A%2F%2Fraw.github.com%2Fstrangeloop%2F2011-slides%2Fmaster%2FLindsay-DistributedGeventZmq.pdf&ei=JoDNUO6OIePQiwK8noHQBg&usg=AFQjCNFa5g9ZliRVoN_yVH7aizU_fDMtfw&bvm=bv.1355325884,d.cGE
#Jeff Lindsey talk on gevent and zeromq
import gevent
from gevent import spawn
import zmq.green as zmq
context = zmq.Context()
def serve():
print 'server online'
socket = context.socket(zmq.PUB)
socket.bind("ipc:///tmp/jeff")
while True:
print 'send'
socket.send("World")
#gevent.sleep(0.1)
def client():
print 'client online'
socket = context.socket(zmq.SUB)
socket.connect("ipc:///tmp/jeff")
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
print 'recv'
message = socket.recv()
cl = spawn(client)
server = spawn(serve)
print 'joinall'
gevent.joinall([cl, server])
print 'end'
I thought when I import the zmq.green version of zmq that the send and receive calls are non blocking and underneath do the task switching.
zmq.green will only yield if these calls would block, it does not yield if they are ready (there's nothing to wait for). In your case the sender is always ready, so it never has a reason to yield.
Some pointers:
gevent.sleep(0)
, it doesn't need to be finite.socket.send
only blocks when the socket is not ready to send (not (socket.events & zmq.POLLOUT)
),
which can never actually be true of a PUB socket (you will see it at HWM for PUSH, DEALER, etc.).What zmq.green amounts to is turning send/recv into:
try:
socket.send(msg, zmq.NOBLOCK) # or recv
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
yield # and wait for socket to be ready, then try again
so if send/recv with NOBLOCK are always succeeding, the socket never yields.
To put it another way: If a socket has nothing to wait for, it won't wait.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With