Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

System error while running subprocesses using Multiprocessing

I am getting a system error (shown below) while performing some simple numpy-based matrix algebra calculations in parallel using Multiprocessing package (python 2.73 with numpy 1.7.0 on Ubuntu 12.04 on Amazon EC2). My code works fine for smaller matrix sizes but crashes for larger ones (with plenty of available memory)

The size of the matrices I use is substantial (my code runs fine for 1000000x10 float dense matrices but crashes for 1000000x500 ones - I am passing these matrices to/from subprocesses by the way). 10 vs 500 is a run-time parameter, everything else stays the same (input data, other run-time parameters etc.)

I've also tried to run the same (ported) code using python3 - for larger matrices the subprocesses go to a sleep/idle mode (instead of crashing as in python 2.7) and the program/subprocesses just hang in there doing nothing. For smaller matrices the code runs fine with python3.

Any suggestions would be highly appreciated (I am running out of ideas here)

Error message:

Exception in thread Thread-5: Traceback (most recent call last):  
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()   File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task) SystemError: NULL result without error in PyObject_Call

The Multiprocessing code I use:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
    if len(listOfInputs) == 0:
        return
    # Add result queue to the list of argument tuples.
    resultQueue = mp.Manager().Queue()
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
    # Create and initialize the pool of workers.
    pool = mp.Pool(processes = nParallelProcesses)
    pool.map(proc, listOfInputsNew)
    # Run the processes.
    pool.close()
    pool.join()
    # Return the results.
    return [resultQueue.get() for i in range(len(listOfInputs))]

Below is the "proc" that gets executed for each subprocess. Basically, it solves many systems of linear equations using numpy (it constructs required matrices inside the subprocess) and returns the results as another matrix. Once again, it works fine for smaller values of one run-time parameter but crashes (or hangs in python3) for larger ones.

def solveForLFV(param):
    startTime = time.time()
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
    LFoutChunkSize = XY.shape[0]
    nLFdim = LFVin.shape[1]
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
    for LFVoutIndex in xrange(LFoutChunkSize):
        LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
        sumLFVinOuterProductLFVpurch[:, :] = 0.
        LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
        for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
            LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
            sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
        LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
    queue.put((chunkI, LFVoutChunk))
    print 'solveForLFV: ', time.time() - startTime, 'sec'
    sys.stdout.flush()
like image 976
Yevgeny Avatar asked Feb 27 '13 17:02

Yevgeny


1 Answers

500,000,000 is pretty big: if you're using float64, that's 4 billion bytes, or about 4 GB. (The 10,000,000 float array would be 80 million bytes, or about 80 MB - much smaller.) I expect the problem has something to do with multiprocessing trying to pickle up the arrays to send to the subprocesses over a pipe.

Since you're on a unix platform, you can avoid this behavior by exploiting the memory inheritance behavior of fork() (used to create multiprocessing's workers). I've had great success with this hack (ripped out of this project), described by the comments.

### A helper for letting the forked processes use data without pickling.
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10))
    for _ in itertools.count())
class ForkedData(object):
    '''
    Class used to pass data to child processes in multiprocessing without
    really pickling/unpickling it. Only works on POSIX.

    Intended use:
        - The master process makes the data somehow, and does e.g.
            data = ForkedData(the_value)
        - The master makes sure to keep a reference to the ForkedData object
          until the children are all done with it, since the global reference
          is deleted to avoid memory leaks when the ForkedData object dies.
        - Master process constructs a multiprocessing.Pool *after*
          the ForkedData construction, so that the forked processes
          inherit the new global.
        - Master calls e.g. pool.map with data as an argument.
        - Child gets the real value through data.value, and uses it read-only.
    '''
    # TODO: does data really need to be used read-only? don't think so...
    # TODO: more flexible garbage collection options
    def __init__(self, val):
        g = globals()
        self.name = next(n for n in _data_name_cands if n not in g)
        g[self.name] = val
        self.master_pid = os.getpid()

    @property
    def value(self):
        return globals()[self.name]

    def __del__(self):
        if os.getpid() == self.master_pid:
            del globals()[self.name]
like image 178
Danica Avatar answered Nov 04 '22 06:11

Danica