I just got started with ZMQ. I am designing an app whose workflow is:
I assume that the PUSH/PULL architecture is suited for this. Please correct me on this.
But how do I handle these scenarios?
So how do I setup something like a timeout in the PUSH/PULL model?
EDIT: Thanks user938949's suggestions, I got a working answer and I am sharing it for posterity.
ZeroMQ patterns are implemented by pairs of sockets with matching types. The built-in core ZeroMQ patterns are: Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
Async-zmq is high-level bindings for zmq in asynchronous manner which is compatible to every async runtime. No need for configuring or tuning features.
In this program, we will create a command server that tells when the worker should exit. Workers subscribes to a topic published by a publisher and prints it. It exits when it receives “Exit” message from the command server.
The poller notifies when there's data (messages) available on the sockets; it's your job to read it. When reading, do it without blocking: socket. recv( ZMQ. DONTWAIT) .
If you are using zeromq >= 3.0, then you can set the RCVTIMEO socket option:
client_receiver.RCVTIMEO = 1000 # in milliseconds
But in general, you can use pollers:
poller = zmq.Poller() poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send
And poller.poll()
takes a timeout:
evts = poller.poll(1000) # wait *up to* one second for a message to arrive.
evts
will be an empty list if there is nothing to receive.
You can poll with zmq.POLLOUT
, to check if a send will succeed.
Or, to handle the case of a peer that might have failed, a:
worker.send(msg, zmq.NOBLOCK)
might suffice, which will always return immediately - raising a ZMQError(zmq.EAGAIN) if the send could not complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With