Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocess with pool workers - memory use optimization

I have a fuzzy string matching script that looks for some 30K needles in a haystack of 4 million company names. While the script works fine, my attempts at speeding up things via parallel processing on an AWS h1.xlarge failed as I'm running out of memory.

Rather than trying to get more memory as explained in response to my previous question, I'd like to find out how to optimize the workflow - I'm fairly new to this so there should be plenty of room. Btw, I've already experimented with queues (also worked but ran into the same MemoryError, plus looked through a bunch of very helpful SO contributions, but not quite there yet.

Here's what seems most relevant of the code. I hope it sufficiently clarifies the logic - happy to provide more info as needed:

def getHayStack():
    ## loads a few million company names into id: name dict
    return hayCompanies

def getNeedles(*args):
    ## loads subset of 30K companies into id: name dict (for allocation to workers)
    return needleCompanies

def findNeedle(needle, haystack):
    """ Identify best match and return results with score """
    results = {}
    for hayID, hayCompany in haystack.iteritems():
        if not isnull(haystack[hayID]):
            results[hayID] = levi.setratio(needle.split(' '), 
                                           hayCompany.split(' '))
    scores = list(results.values())
    resultIDs = list(results.keys())
    needleID = resultIDs[scores.index(max(scores))]
    return [needleID, haystack[needleID], max(scores)]

def runMatch(args):
    """ Execute findNeedle and process results for poolWorker batch"""
    batch, first = args
    last = first + batch
    hayCompanies = getHayStack()
    needleCompanies = getTargets(first, last)
    needles = defaultdict(list)
    current = first
    for needleID, needleCompany in needleCompanies.iteritems():
        current += 1
        needles[targetID] = findNeedle(needleCompany, hayCompanies)
    ## Then store results

if __name__ == '__main__':
    pool = Pool(processes = numProcesses)
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    pool.map_async(runMatch, 
                   itertools.izip(itertools.repeat(targetsPerBatch),
                                  xrange(0, 
                                         totalTargets,
                                         targetsPerBatch))).get(99999999)
    pool.close()
    pool.join()

So I guess the questions are: How can I avoid loading the haystack for all workers - e.g. by sharing the data or taking a different approach like dividing the much larger haystack across workers rather than the needles? How can I otherwise improve memory usage by avoiding or eliminating clutter?

like image 916
Stefan Avatar asked Oct 04 '22 00:10

Stefan


1 Answers

Your design is a bit confusing. You're using a pool of N workers, and then breaking your M jobs work up into N tasks of size M/N. In other words, if you get that all correct, you're simulating worker processes on top of a pool built on top of worker processes. Why bother with that? If you want to use processes, just use them directly. Alternatively, use a pool as a pool, sends each job as its own task, and use the batching feature to batch them up in some appropriate (and tweakable) way.

That means that runMatch just takes a single needleID and needleCompany, and all it does is call findNeedle and then do whatever that # Then store results part is. And then the main program gets a lot simpler:

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        results = pool.map_async(runMatch, needleCompanies.iteritems(), 
                                 chunkSize=NUMBER_TWEAKED_IN_TESTING).get()

Or, if the results are small, instead of having all of the processes (presumably) fighting over some shared resulting-storing thing, just return them. Then you don't need runMatch at all, just:

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        for result in pool.imap_unordered(findNeedle, needleCompanies.iteritems(), 
                                          chunkSize=NUMBER_TWEAKED_IN_TESTING):
            # Store result

Or, alternatively, if you do want to do exactly N batches, just create a Process for each one:

if __name__ == '__main__':
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    processes = [Process(target=runMatch, 
                         args=(targetsPerBatch,
                               xrange(0, 
                                      totalTargets,
                                      targetsPerBatch))) 
                 for _ in range(numProcesses)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

Also, you seem to be calling getHayStack() once for each task (and getNeedles as well). I'm not sure how easy it would be to end up with multiple copies of this live at the same time, but considering that it's the largest data structure you have by far, that would be the first thing I try to rule out. In fact, even if it's not a memory-usage problem, getHayStack could easily be a big performance hit, unless you're already doing some kind of caching (e.g., explicitly storing it in a global or a mutable default parameter value the first time, and then just using it), so it may be worth fixing anyway.

One way to fix both potential problems at once is to use an initializer in the Pool constructor:

def initPool():
    global _haystack
    _haystack = getHayStack()

def runMatch(args):
    global _haystack
    # ...
    hayCompanies = _haystack
    # ...

if __name__ == '__main__':
    pool = Pool(processes=numProcesses, initializer=initPool)
    # ...

Next, I notice that you're explicitly generating lists in multiple places where you don't actually need them. For example:

scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID, haystack[needleID], max(scores)]

If there's more than a handful of results, this is wasteful; just use the results.values() iterable directly. (In fact, it looks like you're using Python 2.x, in which case keys and values are already lists, so you're just making an extra copy for no good reason.)

But in this case, you can simplify the whole thing even farther. You're just looking for the key (resultID) and value (score) with the highest score, right? So:

needleID, score = max(results.items(), key=operator.itemgetter(1))
return [needleID, haystack[needleID], score]

This also eliminates all the repeated searches over score, which should save some CPU.


This may not directly solve the memory problem, but it should hopefully make it easier to debug and/or tweak.

The first thing to try is just to use much smaller batches—instead of input_size/cpu_count, try 1. Does memory usage go down? If not, we've ruled that part out.

Next, try sys.getsizeof(_haystack) and see what it says. If it's, say, 1.6GB, then you're cutting things pretty fine trying to squeeze everything else into 0.4GB, so that's the way to attack it—e.g., use a shelve database instead of a plain dict.

Also try dumping memory usage (with the resource module, getrusage(RUSAGE_SELF)) at the start and end of the initializer function. If the final haystack is only, say, 0.3GB, but you allocate another 1.3GB building it up, that's the problem to attack. For example, you might spin off a single child process to build and pickle the dict, then have the pool initializer just open it and unpickle it. Or combine the two—build a shelve db in the first child, and open it read-only in the initializer. Either way, this would also mean you're only doing the CSV-parsing/dict-building work once instead of 8 times.

On the other hand, if your total VM usage is still low (note that getrusage doesn't directly have any way to see your total VM size—ru_maxrss is often a useful approximation, especially if ru_nswap is 0) at time the first task runs, the problem is with the tasks themselves.

First, getsizeof the arguments to the task function and the value you return. If they're large, especially if they either keep getting larger with each task or are wildly variable, it could just be pickling and unpickling that data takes too much memory, and eventually 8 of them are together big enough to hit the limit.

Otherwise, the problem is most likely in the task function itself. Either you've got a memory leak (you can only have a real leak by using a buggy C extension module or ctypes, but if you keep any references around between calls, e.g., in a global, you could just be holding onto things forever unnecessarily), or some of the tasks themselves take too much memory. Either way, this should be something you can test more easily by pulling out the multiprocessing and just running the tasks directly, which is a lot easier to debug.

like image 116
abarnert Avatar answered Oct 05 '22 23:10

abarnert