Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

0mq one-to-many connection

What is the most correct way to establish a two-way communication between processes using 0mq? I need to create several background processes that will wait for commands from the main process, perform some calculations and return the result back to the main process.

like image 591
Ivan Gromov Avatar asked Dec 27 '22 15:12

Ivan Gromov


1 Answers

There are a few ways to do this. The most straight forward approach might be to use REQ/REP sockets. Each background process/worker would have a REP socket, and you would use a REQ socket to communicate with them:

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

You'd have to connect to each worker to send them a message, and get back results:

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

Another approach would be to use PUB/SUB to fire off messages to the workers and PUSH/PULL to harvest results:

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

To broadcast a command to a specific worker, you'd do something like:

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

Pull in results:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result
like image 153
zeekay Avatar answered Jan 09 '23 02:01

zeekay