Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does this Python 0MQ script for distributed computing hang at a fixed input size?

I recently started learning 0MQ. Earlier today, I ran into a blog, Python Multiprocessing with ZeroMQ. It talked about the ventilator pattern in the 0MQ Guide that I read about, so I decided to give it a try.

Instead of just calculating products of numbers by workers as the original code does, I decided to try to make the ventilator send large arrays to workers via 0mq messages. The following is the code that I have been using for my "experiments".

As noted in a comment below, any time I attempted to increase the variable string_length to a number larger than 3MB, the code hangs.

Typical symptom: lets say we set the string_length to 4MB (i.e. 4194304), then perhaps the result manager gets the result from one worker, and then the code just pauses. htop shows the 2 cores not doing much. Etherape network traffic monitor shows no traffic on the lo interface either.

So far, after hours looking around, I have not been able to figure out what's causing this, and would appreciate a hint or two as to why and any resolution about this issue. Thanks!

I am running Ubuntu 11.04 64bit on a Dell notebook with Intel Core due CPU, 8GB RAM, 80GB Intel X25MG2 SSD, Python 2.7.1+, libzmq1 2.1.10-1chl1~natty1, python-pyzmq 2.1.10-1chl1~natty1

import time
import zmq
from multiprocessing import Process, cpu_count

np = cpu_count() 
pool_size = np
number_of_elements = 128
# Odd, why once the slen is bumped to 3MB or above, the code hangs?
string_length = 1024 * 1024 * 3

def create_inputs(nelem, slen, pb=True):
    '''
    Generates an array that contains nelem fix-sized (of slen bytes)
    random strings and an accompanying array of hexdigests of the 
    former's elements.  Both are returned in a tuple.

    :type nelem: int
    :param nelem: The desired number of elements in the to be generated
                  array.
    :type slen: int
    :param slen: The desired number of bytes of each array element.
    :type pb: bool
    :param pb: If True, displays a text progress bar during input array
               generation.
    '''
    from os import urandom
    import sys
    import hashlib

    if pb:
        if nelem <= 64:
            toolbar_width = nelem
            chunk_size = 1
        else:
            toolbar_width = 64
            chunk_size = nelem // toolbar_width
        description = '%d random strings of %d bytes. ' % (nelem, slen) 
        s = ''.join(('Generating an array of ', description, '...\n'))
        sys.stdout.write(s)
        # create an ASCII progress bar
        sys.stdout.write("[%s]" % (" " * toolbar_width))
        sys.stdout.flush()
        sys.stdout.write("\b" * (toolbar_width+1)) 
    array   = list()
    hash4a  = list()
    try:
        for i in range(nelem):
            e = urandom(int(slen))
            array.append(e)
            h = hashlib.md5()
            h.update(e)
            he = h.hexdigest()
            hash4a.append(he)
            i += 1
            if pb and i and i % chunk_size == 0:
                sys.stdout.write("-")
                sys.stdout.flush()
        if pb:
            sys.stdout.write("\n")
    except MemoryError:
        print('Memory Error: discarding existing arrays')
        array  = list()
        hash4a = list()
    finally:
        return array, hash4a

# The "ventilator" function generates an array of nelem fix-sized (of slen
# bytes long) random strings, and sends the array down a zeromq "PUSH"
# connection to be processed by listening workers, in a round robin load
# balanced fashion.

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Create the input array
    nelem = number_of_elements
    slen = string_length
    payloads = create_inputs(nelem, slen)

    # Send an array to each worker
    for num in range(np):
        work_message = { 'num' : payloads }
        ventilator_send.send_pyobj(work_message)

    time.sleep(1)

# The "worker" functions listen on a zeromq PULL connection for "work"
# (array to be processed) from the ventilator, get the length of the array
# and send the results down another zeromq PUSH connection to the results
# manager.

def worker(wrk_num):
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.connect("tcp://127.0.0.1:5558")

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    # Loop and accept messages from both channels, acting accordingly
    while True:
        socks = dict(poller.poll())

        # If the message came from work_receiver channel, get the length
        # of the array and send the answer to the results reporter
        if socks.get(work_receiver) == zmq.POLLIN:
            #work_message = work_receiver.recv_json()
            work_message = work_receiver.recv_pyobj()
            length = len(work_message['num'][0])
            answer_message = { 'worker' : wrk_num, 'result' : length }
            results_sender.send_json(answer_message)

        # If the message came over the control channel, shut down the worker.
        if socks.get(control_receiver) == zmq.POLLIN:
            control_message = control_receiver.recv()
            if control_message == "FINISHED":
                print("Worker %i received FINSHED, quitting!" % wrk_num)
                break

# The "results_manager" function receives each result from multiple workers,
# and prints those results.  When all results have been received, it signals
# the worker processes to shut down.

def result_manager():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive results
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")

    # Set up a channel to send control commands
    control_sender = context.socket(zmq.PUB)
    control_sender.bind("tcp://127.0.0.1:5559")

    for task_nbr in range(np):
        result_message = results_receiver.recv_json()
        print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])

    # Signal to all workers that we are finsihed
    control_sender.send("FINISHED")
    time.sleep(5)

if __name__ == "__main__":

    # Create a pool of workers to distribute work to
    for wrk_num in range(pool_size):
        Process(target=worker, args=(wrk_num,)).start()

    # Fire up our result manager...
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()
like image 392
user183394 Avatar asked Jan 18 '12 04:01

user183394


1 Answers

The problem is that your ventilator (PUSH) socket is closing before it's done sending. You have a sleep of 1s at the end of the ventilator function, which is not enough to send 384MB messages. That's why you have the threshold you have, if the sleep were shorter then the threshold would be lower.

That said, LINGER is supposed to prevent this sort of thing, so I would bring this up with zeromq: PUSH does not appear to respect LINGER.

A fix for your particular example (without adding an indeterminately long sleep) would be to use the same FINISH signal to terminate your ventilator as your workers. This way, you guarantee that your ventilator survives as long as it needs to.

Revised ventilator:

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Set up a channel to receive control messages
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Create the input array
    nelem = number_of_elements
    slen = string_length
    payloads = create_inputs(nelem, slen)

    # Send an array to each worker
    for num in range(np):
        work_message = { 'num' : payloads }
        ventilator_send.send_pyobj(work_message)

    # Poll for FINISH message, so we don't shutdown too early
    poller = zmq.Poller()
    poller.register(control_receiver, zmq.POLLIN)

    while True:
        socks = dict(poller.poll())

        if socks.get(control_receiver) == zmq.POLLIN:
            control_message = control_receiver.recv()
            if control_message == "FINISHED":
                print("Ventilator received FINSHED, quitting!")
                break
            # else: unhandled message
like image 142
minrk Avatar answered Oct 02 '22 22:10

minrk