I started using ZeroMQ this week, and when using the Request-Response pattern I am not sure how to have a worker safely "hang up" and close his socket without possibly dropping a message and causing the customer who sent that message to never get a response. Imagine a worker written in Python who looks something like this:
import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
s.connect('tcp://127.0.0.1:9999')
while i in range(8):
s.recv()
s.send('reply')
s.close()
I have been doing experiments and have found that a customer at 127.0.0.1:9999
of socket type zmq.REQ
who makes a fair-queued request just might have the misfortune of having the fair-queuing algorithm choose the above worker right after the worker has done its last send()
but before it runs the following close()
method. In that case, it seems that the request is received and buffered by the ØMQ stack in the worker process, and that the request is then lost when close()
throws out everything associated with the socket.
How can a worker detach "safely" — is there any way to signal "I don't want messages anymore", then (a) loop over any final messages that have arrived during transmission of the signal, (b) generate their replies, and then (c) execute close()
with the guarantee that no messages are being thrown away?
Edit: I suppose the raw state that I would want to enter is a "half-closed" state, where no further requests could be received — and the sender would know that — but where the return path is still open so that I can check my incoming buffer for one last arrived message and respond to it if there is one sitting in the buffer.
Edit: In response to a good question, corrected the description to make the number of waiting messages plural, as there could be many connections waiting on replies.
ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast.
ZeroMQ is a library used to implement messaging and communication systems between applications and processes - fast and asynchronously.
A ØMQ context is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of the caller. Individual ØMQ sockets are not thread safe except in the case where full memory barriers are issued when migrating a socket from one thread to another.
No it cannot. ZeroMQ is a messaging library and is not just a pure socket. It uses its own protocol called ZMTP and both endpoints are required to understand it.
You seem to think that you are trying to avoid a “simple” race condition such as in
... = zmq_recv(fd);
do_something();
zmq_send(fd, answer);
/* Let's hope a new request does not arrive just now, please close it quickly! */
zmq_close(fd);
but I think the problem is that fair queuing (round-robin) makes things even more difficult: you might already even have several queued requests on your worker. The sender will not wait for your worker to be free before sending a new request if it is its turn to receive one, so at the time you call zmq_send
other requests might be waiting already.
In fact, it looks like you might have selected the wrong data direction. Instead of having a requests pool send requests to your workers (even when you would prefer not to receive new ones), you might want to have your workers fetch a new request from a requests queue, take care of it, then send the answer.
Of course, it means using XREP
/XREQ
, but I think it is worth it.
Edit: I wrote some code implementing the other direction to explain what I mean.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With