Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to utilize all cores with python multiprocessing

I have been fiddling with Python's multiprocessing functionality for upwards of an hour now, trying to parallelize a rather complex graph traversal function using multiprocessing.Process and multiprocessing.Manager:

import networkx as nx import csv import time  from operator import itemgetter import os import multiprocessing as mp  cutoff = 1  exclusionlist = ["cpd:C00024"]  DG = nx.read_gml("KeggComplete.gml", relabel=True)  for exclusion in exclusionlist:     DG.remove_node(exclusion)  # checks if 'memorizedPaths exists, and if not, creates it fn = os.path.join(os.path.dirname(__file__),                   'memorizedPaths' + str(cutoff+1)) if not os.path.exists(fn):     os.makedirs(fn)  manager = mp.Manager() memorizedPaths = manager.dict() filepaths = manager.dict() degreelist = sorted(DG.degree_iter(),                     key=itemgetter(1),                     reverse=True)  def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):     source = item[0]     uniqueTreePaths = []      if cutoff < 1:         return      visited = [source]     stack = [iter(DG[source])]      while stack:         children = stack[-1]         child = next(children, None)          if child is None:             stack.pop()             visited.pop()         elif child in memorizedPaths:             for path in memorizedPaths[child]:                 newPath = (tuple(visited) + tuple(path))                 if (len(newPath) <= cutoff) and                     (len(set(visited) & set(path)) == 0):                     uniqueTreePaths.append(newPath)             continue         elif len(visited) < cutoff:             if child not in visited:                 visited.append(child)                 stack.append(iter(DG[child]))                  if visited not in uniqueTreePaths:                     uniqueTreePaths.append(tuple(visited))         else: # len(visited) == cutoff:             if (visited not in uniqueTreePaths) and                 (child not in visited):                 uniqueTreePaths.append(tuple(visited + [child]))             stack.pop()             visited.pop()     # writes the absolute path of the node path file into the hash table     filepaths[source] = str(fn) + "/" + str(source) + "path.txt"     with open (filepaths[source], "wb") as csvfile2:         writer = csv.writer(csvfile2, delimiter=" ", quotechar="|")         for path in uniqueTreePaths:             writer.writerow(path)      memorizedPaths[source] = uniqueTreePaths  ############################################################################  if __name__ == '__main__':     start = time.clock()      for item in degreelist:         test = mp.Process(target=_all_simple_paths_graph,                           args=(DG, cutoff, item, memorizedPaths, filepaths))         test.start()         test.join()  end = time.clock() print (end-start) 

Currently - though luck and magic - it works (sort of). My problem is I'm only using 12 of my 24 cores.

Can someone explain why this might be the case? Perhaps my code isn't the best multiprocessing solution, or is it a feature of my architecture Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64?

EDIT:

I managed to get:

p = mp.Pool() for item in degreelist:     p.apply_async(_all_simple_paths_graph,                   args=(DG, cutoff, item, memorizedPaths, filepaths)) p.close() p.join() 

Working, however, it's VERY SLOW! So I assume I'm using the wrong function for the job. hopefully it helps clarify exactly what I'm trying to accomplish!

EDIT2: .map attempt:

partialfunc = partial(_all_simple_paths_graph,                       DG=DG,                       cutoff=cutoff,                       memorizedPaths=memorizedPaths,                       filepaths=filepaths) p = mp.Pool() for item in processList:     processVar = p.map(partialfunc, xrange(len(processList)))    p.close() p.join() 

Works, is slower than singlecore. Time to optimize!

like image 379
Darkstarone Avatar asked Sep 30 '13 03:09

Darkstarone


People also ask

Does Python multiprocessing use all cores?

A multiprocessor is a computer means that the computer has more than one central processor. If a computer has only one processor with multiple cores, the tasks can be run parallel using multithreading in Python. A multiprocessor system has the ability to support more than one processor at the same time.

How many cores does multiprocessing use?

Common research programming languages use only one processor The “multi” in multiprocessing refers to the multiple cores in a computer's central processing unit (CPU). Computers originally had only one CPU core or processor, which is the unit that makes all our mathematical calculations possible.

How do I force a program to use multiple cores?

Type 'msconfig' into the Windows Search Box and hit Enter. Select the Boot tab and then Advanced options. Check the box next to Number of processors and select the number of cores you want to use (probably 1, if you are having compatibility issues) from the menu. Select OK and then Apply.


1 Answers

Too much piling up here to address in comments, so, where mp is multiprocessing:

mp.cpu_count() should return the number of processors. But test it. Some platforms are funky, and this info isn't always easy to get. Python does the best it can.

If you start 24 processes, they'll do exactly what you tell them to do ;-) Looks like mp.Pool() would be most convenient for you. You pass the number of processes you want to create to its constructor. mp.Pool(processes=None) will use mp.cpu_count() for the number of processors.

Then you can use, for example, .imap_unordered(...) on your Pool instance to spread your degreelist across processes. Or maybe some other Pool method would work better for you - experiment.

If you can't bash the problem into Pool's view of the world, you could instead create an mp.Queue to create a work queue, .put()'ing nodes (or slices of nodes, to reduce overhead) to work on in the main program, and write the workers to .get() work items off that queue. Ask if you need examples. Note that you need to put sentinel values (one per process) on the queue, after all the "real" work items, so that worker processes can test for the sentinel to know when they're done.

FYI, I like queues because they're more explicit. Many others like Pools better because they're more magical ;-)

Pool Example

Here's an executable prototype for you. This shows one way to use imap_unordered with Pool and chunksize that doesn't require changing any function signatures. Of course you'll have to plug in your real code ;-) Note that the init_worker approach allows passing "most of" the arguments only once per processor, not once for every item in your degreeslist. Cutting the amount of inter-process communication can be crucial for speed.

import multiprocessing as mp  def init_worker(mps, fps, cut):     global memorizedPaths, filepaths, cutoff     global DG      print "process initializing", mp.current_process()     memorizedPaths, filepaths, cutoff = mps, fps, cut     DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)  def work(item):     _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)  def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):     pass # print "doing " + str(item)  if __name__ == "__main__":     m = mp.Manager()     memorizedPaths = m.dict()     filepaths = m.dict()     cutoff = 1 ##     # use all available CPUs     p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,                                                    filepaths,                                                    cutoff))     degreelist = range(100000) ##     for _ in p.imap_unordered(work, degreelist, chunksize=500):         pass     p.close()     p.join() 

I strongly advise running this exactly as-is, so you can see that it's blazing fast. Then add things to it a bit a time, to see how that affects the time. For example, just adding

   memorizedPaths[item] = item 

to _all_simple_paths_graph() slows it down enormously. Why? Because the dict gets bigger and bigger with each addition, and this process-safe dict has to be synchronized (under the covers) among all the processes. The unit of synchronization is "the entire dict" - there's no internal structure the mp machinery can exploit to do incremental updates to the shared dict.

If you can't afford this expense, then you can't use a Manager.dict() for this. Opportunities for cleverness abound ;-)

like image 173
Tim Peters Avatar answered Oct 05 '22 17:10

Tim Peters