I have a fuzzy string matching script that looks for some 30K needles in a haystack of 4 million company names. While the script works fine, my attempts at speeding up things via parallel processing on an AWS h1.xlarge failed as I'm running out of memory.
Rather than trying to get more memory as explained in response to my previous question, I'd like to find out how to optimize the workflow - I'm fairly new to this so there should be plenty of room. Btw, I've already experimented with queues (also worked but ran into the same MemoryError
, plus looked through a bunch of very helpful SO contributions, but not quite there yet.
Here's what seems most relevant of the code. I hope it sufficiently clarifies the logic - happy to provide more info as needed:
def getHayStack():
## loads a few million company names into id: name dict
return hayCompanies
def getNeedles(*args):
## loads subset of 30K companies into id: name dict (for allocation to workers)
return needleCompanies
def findNeedle(needle, haystack):
""" Identify best match and return results with score """
results = {}
for hayID, hayCompany in haystack.iteritems():
if not isnull(haystack[hayID]):
results[hayID] = levi.setratio(needle.split(' '),
hayCompany.split(' '))
scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID, haystack[needleID], max(scores)]
def runMatch(args):
""" Execute findNeedle and process results for poolWorker batch"""
batch, first = args
last = first + batch
hayCompanies = getHayStack()
needleCompanies = getTargets(first, last)
needles = defaultdict(list)
current = first
for needleID, needleCompany in needleCompanies.iteritems():
current += 1
needles[targetID] = findNeedle(needleCompany, hayCompanies)
## Then store results
if __name__ == '__main__':
pool = Pool(processes = numProcesses)
totalTargets = len(getTargets('all'))
targetsPerBatch = totalTargets / numProcesses
pool.map_async(runMatch,
itertools.izip(itertools.repeat(targetsPerBatch),
xrange(0,
totalTargets,
targetsPerBatch))).get(99999999)
pool.close()
pool.join()
So I guess the questions are: How can I avoid loading the haystack for all workers - e.g. by sharing the data or taking a different approach like dividing the much larger haystack across workers rather than the needles? How can I otherwise improve memory usage by avoiding or eliminating clutter?
Your design is a bit confusing. You're using a pool of N workers, and then breaking your M jobs work up into N tasks of size M/N. In other words, if you get that all correct, you're simulating worker processes on top of a pool built on top of worker processes. Why bother with that? If you want to use processes, just use them directly. Alternatively, use a pool as a pool, sends each job as its own task, and use the batching feature to batch them up in some appropriate (and tweakable) way.
That means that runMatch
just takes a single needleID and needleCompany, and all it does is call findNeedle
and then do whatever that # Then store results
part is. And then the main program gets a lot simpler:
if __name__ == '__main__':
with Pool(processes=numProcesses) as pool:
results = pool.map_async(runMatch, needleCompanies.iteritems(),
chunkSize=NUMBER_TWEAKED_IN_TESTING).get()
Or, if the results are small, instead of having all of the processes (presumably) fighting over some shared resulting-storing thing, just return them. Then you don't need runMatch
at all, just:
if __name__ == '__main__':
with Pool(processes=numProcesses) as pool:
for result in pool.imap_unordered(findNeedle, needleCompanies.iteritems(),
chunkSize=NUMBER_TWEAKED_IN_TESTING):
# Store result
Or, alternatively, if you do want to do exactly N batches, just create a Process for each one:
if __name__ == '__main__':
totalTargets = len(getTargets('all'))
targetsPerBatch = totalTargets / numProcesses
processes = [Process(target=runMatch,
args=(targetsPerBatch,
xrange(0,
totalTargets,
targetsPerBatch)))
for _ in range(numProcesses)]
for p in processes:
p.start()
for p in processes:
p.join()
Also, you seem to be calling getHayStack()
once for each task (and getNeedles
as well). I'm not sure how easy it would be to end up with multiple copies of this live at the same time, but considering that it's the largest data structure you have by far, that would be the first thing I try to rule out. In fact, even if it's not a memory-usage problem, getHayStack
could easily be a big performance hit, unless you're already doing some kind of caching (e.g., explicitly storing it in a global or a mutable default parameter value the first time, and then just using it), so it may be worth fixing anyway.
One way to fix both potential problems at once is to use an initializer in the Pool
constructor:
def initPool():
global _haystack
_haystack = getHayStack()
def runMatch(args):
global _haystack
# ...
hayCompanies = _haystack
# ...
if __name__ == '__main__':
pool = Pool(processes=numProcesses, initializer=initPool)
# ...
Next, I notice that you're explicitly generating lists in multiple places where you don't actually need them. For example:
scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID, haystack[needleID], max(scores)]
If there's more than a handful of results, this is wasteful; just use the results.values()
iterable directly. (In fact, it looks like you're using Python 2.x, in which case keys
and values
are already lists, so you're just making an extra copy for no good reason.)
But in this case, you can simplify the whole thing even farther. You're just looking for the key (resultID) and value (score) with the highest score, right? So:
needleID, score = max(results.items(), key=operator.itemgetter(1))
return [needleID, haystack[needleID], score]
This also eliminates all the repeated searches over score
, which should save some CPU.
This may not directly solve the memory problem, but it should hopefully make it easier to debug and/or tweak.
The first thing to try is just to use much smaller batches—instead of input_size/cpu_count, try 1. Does memory usage go down? If not, we've ruled that part out.
Next, try sys.getsizeof(_haystack)
and see what it says. If it's, say, 1.6GB, then you're cutting things pretty fine trying to squeeze everything else into 0.4GB, so that's the way to attack it—e.g., use a shelve
database instead of a plain dict
.
Also try dumping memory usage (with the resource
module, getrusage(RUSAGE_SELF)
) at the start and end of the initializer function. If the final haystack is only, say, 0.3GB, but you allocate another 1.3GB building it up, that's the problem to attack. For example, you might spin off a single child process to build and pickle the dict, then have the pool initializer just open it and unpickle it. Or combine the two—build a shelve
db in the first child, and open it read-only in the initializer. Either way, this would also mean you're only doing the CSV-parsing/dict-building work once instead of 8 times.
On the other hand, if your total VM usage is still low (note that getrusage
doesn't directly have any way to see your total VM size—ru_maxrss
is often a useful approximation, especially if ru_nswap
is 0) at time the first task runs, the problem is with the tasks themselves.
First, getsizeof
the arguments to the task function and the value you return. If they're large, especially if they either keep getting larger with each task or are wildly variable, it could just be pickling and unpickling that data takes too much memory, and eventually 8 of them are together big enough to hit the limit.
Otherwise, the problem is most likely in the task function itself. Either you've got a memory leak (you can only have a real leak by using a buggy C extension module or ctypes
, but if you keep any references around between calls, e.g., in a global, you could just be holding onto things forever unnecessarily), or some of the tasks themselves take too much memory. Either way, this should be something you can test more easily by pulling out the multiprocessing and just running the tasks directly, which is a lot easier to debug.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With