Can someone point me to an example of a REQ/REP non-blocking ZeroMQ (0MQ) with Python bindings? Perhaps my understanding of ZMQ is faulty but I couldn't find an example online.
I have a server in Node.JS that sends work from multiple clients to the server. The idea is that the server can spin up a bunch of jobs that operate in parallel instead of processing data for one client followed by the next
You can use for this goal both zmq.Poller
(many examples you can find in zguide repo, eg rrbroker.py) or gevent-zeromq
implementation (code sample).
The example provided in the accepted answer gives the gist of it, but you can get away with something a bit simpler as well by using zmq.device
for the broker while otherwise sticking to the "Extended Request-Reply" pattern from the guide. As such, a hello worldy example for the server could look something like the following:
import time
import threading
import zmq
context = zmq.Context()
def worker():
socket = context.socket(zmq.REP)
socket.connect('inproc://workers')
while True:
msg = socket.recv_string()
print(f'Received request: [{msg}]')
time.sleep(1)
socket.send_string(msg)
url_client = 'tcp://*:5556'
clients = context.socket(zmq.ROUTER)
clients.bind(url_client)
workers = context.socket(zmq.DEALER)
workers.bind('inproc://workers')
for _ in range(4):
thread = threading.Thread(target=worker)
thread.start()
zmq.device(zmq.QUEUE, clients, workers)
Here we're letting four workers handle incoming requests in parallel. Now, you're using Node on the client side, but just to keep the example complete, one can use the Python client below to see that this works. Here, we're creating 10 requests which will then be handled in 3 batches:
import zmq
import threading
context = zmq.Context()
def make_request(a):
socket = context.socket(zmq.REQ)
socket.connect('tcp://localhost:5556')
print(f'Sending request {a} ...')
socket.send_string(str(a))
message = socket.recv_string()
print(f'Received reply from request {a} [{message}]')
for a in range(10):
thread = threading.Thread(target=make_request, args=(a,))
thread.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With