Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

zeromq: how to prevent infinite wait?

Tags:

python

zeromq

I just got started with ZMQ. I am designing an app whose workflow is:

  1. one of many clients (who have random PULL addresses) PUSH a request to a server at 5555
  2. the server is forever waiting for client PUSHes. When one comes, a worker process is spawned for that particular request. Yes, worker processes can exist concurrently.
  3. When that process completes it's task, it PUSHes the result to the client.

I assume that the PUSH/PULL architecture is suited for this. Please correct me on this.


But how do I handle these scenarios?

  1. the client_receiver.recv() will wait for an infinite time when server fails to respond.
  2. the client may send request, but it will fail immediately after, hence a worker process will remain stuck at server_sender.send() forever.

So how do I setup something like a timeout in the PUSH/PULL model?


EDIT: Thanks user938949's suggestions, I got a working answer and I am sharing it for posterity.

like image 660
Jesvin Jose Avatar asked Sep 24 '11 12:09

Jesvin Jose


People also ask

Does ZeroMQ use sockets?

ZeroMQ patterns are implemented by pairs of sockets with matching types. The built-in core ZeroMQ patterns are: Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.

Is Zmq asynchronous?

Async-zmq is high-level bindings for zmq in asynchronous manner which is compatible to every async runtime. No need for configuring or tuning features.

What is Zmq Poller?

In this program, we will create a command server that tells when the worker should exit. Workers subscribes to a topic published by a publisher and prints it. It exits when it receives “Exit” message from the command server.

How does Zmq Poller work?

The poller notifies when there's data (messages) available on the sockets; it's your job to read it. When reading, do it without blocking: socket. recv( ZMQ. DONTWAIT) .


1 Answers

If you are using zeromq >= 3.0, then you can set the RCVTIMEO socket option:

client_receiver.RCVTIMEO = 1000 # in milliseconds 

But in general, you can use pollers:

poller = zmq.Poller() poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send 

And poller.poll() takes a timeout:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive. 

evts will be an empty list if there is nothing to receive.

You can poll with zmq.POLLOUT, to check if a send will succeed.

Or, to handle the case of a peer that might have failed, a:

worker.send(msg, zmq.NOBLOCK) 

might suffice, which will always return immediately - raising a ZMQError(zmq.EAGAIN) if the send could not complete.

like image 60
minrk Avatar answered Oct 11 '22 11:10

minrk