Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyzmq non-blocking socket

Can someone point me to an example of a REQ/REP non-blocking ZeroMQ (0MQ) with Python bindings? Perhaps my understanding of ZMQ is faulty but I couldn't find an example online.

I have a server in Node.JS that sends work from multiple clients to the server. The idea is that the server can spin up a bunch of jobs that operate in parallel instead of processing data for one client followed by the next

like image 789
ejang Avatar asked Sep 18 '12 01:09

ejang


2 Answers

You can use for this goal both zmq.Poller (many examples you can find in zguide repo, eg rrbroker.py) or gevent-zeromq implementation (code sample).

like image 106
Alexey Kachayev Avatar answered Sep 16 '22 16:09

Alexey Kachayev


The example provided in the accepted answer gives the gist of it, but you can get away with something a bit simpler as well by using zmq.device for the broker while otherwise sticking to the "Extended Request-Reply" pattern from the guide. As such, a hello worldy example for the server could look something like the following:

import time
import threading
import zmq

context = zmq.Context()

def worker():
    socket = context.socket(zmq.REP)
    socket.connect('inproc://workers')
    while True:
        msg = socket.recv_string()
        print(f'Received request: [{msg}]')
        time.sleep(1)
        socket.send_string(msg)

url_client = 'tcp://*:5556'
clients = context.socket(zmq.ROUTER)
clients.bind(url_client)
workers = context.socket(zmq.DEALER)
workers.bind('inproc://workers')

for _ in range(4):
    thread = threading.Thread(target=worker)
    thread.start()

zmq.device(zmq.QUEUE, clients, workers)

Here we're letting four workers handle incoming requests in parallel. Now, you're using Node on the client side, but just to keep the example complete, one can use the Python client below to see that this works. Here, we're creating 10 requests which will then be handled in 3 batches:

import zmq
import threading

context = zmq.Context()

def make_request(a):
    socket = context.socket(zmq.REQ)
    socket.connect('tcp://localhost:5556')
    print(f'Sending request {a} ...')
    socket.send_string(str(a))
    message = socket.recv_string()
    print(f'Received reply from request {a} [{message}]')

for a in range(10):
    thread = threading.Thread(target=make_request, args=(a,))
    thread.start()
like image 42
fuglede Avatar answered Sep 18 '22 16:09

fuglede