What is the most correct way to establish a two-way communication between processes using 0mq? I need to create several background processes that will wait for commands from the main process, perform some calculations and return the result back to the main process.
There are a few ways to do this. The most straight forward approach might be to use REQ
/REP
sockets. Each background process/worker would have a REP
socket, and you would use a REQ
socket to communicate with them:
import zmq
def worker(addr):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(addr)
while True:
# get message from boss
msg = socket.recv()
# ...do smth
# send back results
socket.send(msg)
if __name__ == '__main__':
# spawn 5 workers
from multiprocessing import Process
for i in range(5):
Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()
You'd have to connect to each worker to send them a message, and get back results:
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()
Another approach would be to use PUB
/SUB
to fire off messages to the workers and PUSH
/PULL
to harvest results:
import zmq
def worker(worker_id, publisher_addr, results_addr):
context = zmq.Context()
sub = context.socket(zmq.SUB)
sub.connect(publisher_addr)
sub.setsockopt(zmq.SUBSCRIBE, worker_id)
push = context.socket(zmq.PUSH)
push.connect(results_addr)
while True:
msg = sub.recv_multipart()[1]
# do smth, send off results
push.send_multipart([worker_id, msg])
if __name__ == '__main__':
publisher_addr = 'tcp://127.0.0.1:5000'
results_addr = 'tcp://127.0.0.1:5001'
# launch some workers into space
from multiprocessing import Process
for i in range(5):
Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()
To broadcast a command to a specific worker, you'd do something like:
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])
Pull in results:
context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)
while True:
worker_id, result = pull.recv_multipart()
print worker_id, result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With