Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why am I not seeing speed up via multiprocessing in Python?

I am trying to parallelize an embarrassingly parallel for loop (previously asked here) and settled on this implementation that fit my parameters:

    with Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets, train_size_common, feat_sel_size, train_perc,
                                            total_test_samples, num_classes, num_features, label_set,
                                            method_names, pos_class_index, out_results_dir, exhaustive_search])
        partial_func_holdout = partial(holdout_trial_compare_datasets, *shared_inputs)

        with Pool(processes=num_procs) as pool:
            cv_results = pool.map(partial_func_holdout, range(num_repetitions))

The reason I need to use a proxy object (shared between processes) is the first element in the shared proxy list datasets that is a list of large objects (each about 200-300MB). This datasets list usually has 5-25 elements. I typically need to run this program on a HPC cluster.

Here is the question, when I run this program with 32 processes and 50GB of memory (num_repetitions=200, with datasets being a list of 10 objects, each 250MB), I do not see a speedup even by factor of 16 (with 32 parallel processes). I do not understand why - any clues? Any obvious mistakes, or bad choices? Where can I improve this implementation? Any alternatives?

I am sure this has been discussed before, and the reasons can be varied and very specific to implementation - hence I request you to provide me your 2 cents. Thanks.

Update: I did some profiling with cProfile to get a better idea - here is some info, sorted by cumulative time.

In [19]: p.sort_stats('cumulative').print_stats(50)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: cumulative time
   List reduced from 4510 to 50 due to restriction <50>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    897/1    0.044    0.000  662.202  662.202 {built-in method builtins.exec}
        1    0.000    0.000  662.202  662.202 test_rhst.py:2(<module>)
        1    0.001    0.001  661.341  661.341 test_rhst.py:70(test_chance_classifier_binary)
        1    0.000    0.000  661.336  661.336 /Users/Reddy/dev/neuropredict/neuropredict/rhst.py:677(run)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:533(wait)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:263(wait)
       23  661.233   28.749  661.233   28.749 {method 'acquire' of '_thread.lock' objects}
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:261(map)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:637(get)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:634(wait)
    866/8    0.004    0.000    0.868    0.108 <frozen importlib._bootstrap>:958(_find_and_load)
    866/8    0.003    0.000    0.867    0.108 <frozen importlib._bootstrap>:931(_find_and_load_unlocked)
    720/8    0.003    0.000    0.865    0.108 <frozen importlib._bootstrap>:641(_load_unlocked)
    596/8    0.002    0.000    0.865    0.108 <frozen importlib._bootstrap_external>:672(exec_module)
   1017/8    0.001    0.000    0.863    0.108 <frozen importlib._bootstrap>:197(_call_with_frames_removed)
   522/51    0.001    0.000    0.765    0.015 {built-in method builtins.__import__}

The profiling info now sorted by time

In [20]: p.sort_stats('time').print_stats(20)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: internal time
   List reduced from 4510 to 20 due to restriction <20>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       23  661.233   28.749  661.233   28.749 {method 'acquire' of '_thread.lock' objects}
   115/80    0.177    0.002    0.211    0.003 {built-in method _imp.create_dynamic}
      595    0.072    0.000    0.072    0.000 {built-in method marshal.loads}
        1    0.045    0.045    0.045    0.045 {method 'acquire' of '_multiprocessing.SemLock' objects}
    897/1    0.044    0.000  662.202  662.202 {built-in method builtins.exec}
        3    0.042    0.014    0.042    0.014 {method 'read' of '_io.BufferedReader' objects}
2037/1974    0.037    0.000    0.082    0.000 {built-in method builtins.__build_class__}
      286    0.022    0.000    0.061    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:12(docformat)
     2886    0.021    0.000    0.021    0.000 {built-in method posix.stat}
       79    0.016    0.000    0.016    0.000 {built-in method posix.read}
      597    0.013    0.000    0.021    0.000 <frozen importlib._bootstrap_external>:830(get_data)
      276    0.011    0.000    0.013    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/sre_compile.py:250(_optimize_charset)
      108    0.011    0.000    0.038    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py:626(_construct_argparser)
     1225    0.011    0.000    0.050    0.000 <frozen importlib._bootstrap_external>:1233(find_spec)
     7179    0.009    0.000    0.009    0.000 {method 'splitlines' of 'str' objects}
       33    0.008    0.000    0.008    0.000 {built-in method posix.waitpid}
      283    0.008    0.000    0.015    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:128(indentcount_lines)
        3    0.008    0.003    0.008    0.003 {method 'poll' of 'select.poll' objects}
     7178    0.008    0.000    0.008    0.000 {method 'expandtabs' of 'str' objects}
      597    0.007    0.000    0.007    0.000 {method 'read' of '_io.FileIO' objects}

More profiling info sorted by percall info: profiling info sorted by percall

Update 2

The elements in the large list datasets I mentioned earlier are not usually as big - they are typically 10-25MB each. But depending on the floating point precision used, number of samples and features, this can easily grow to 500MB-1GB per element also. hence I'd prefer a solution that can scale.

Update 3:

The code inside holdout_trial_compare_datasets uses method GridSearchCV of scikit-learn, which internally uses joblib library if we set n_jobs > 1 (or whenever we even set it). This might lead to some bad interactions between multiprocessing and joblib. So trying another config where I do not set n_jobs at all (which should to default no parallelism within scikit-learn). Will keep you posted.

like image 727
Pradeep Reddy Raamana Avatar asked Oct 16 '17 03:10

Pradeep Reddy Raamana


1 Answers

Based on discussion in the comments, I did a mini experiment, compared three versions of implementation:

  • v1: basically as same as your approach, in fact, as partial(f1, *shared_inputs) will unpack proxy_manager.list immediately, Manager.List not involved here, data passed to worker with the internal queue of Pool.
  • v2: v2 made use Manager.List, work function will receive a ListProxy object, it fetches shared data via a internal connection to a server process.
  • v3: child process share data from the parent, take advantage of fork(2) system call.

def f1(*args):
    for e in args[0]: pow(e, 2)

def f2(*args):
    for e in args[0][0]: pow(e, 2)

def f3(n):
    for i in datasets: pow(i, 2)

def v1(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f1, *shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v2(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f2, shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v3(np):
    with mp.Pool(processes=np) as pool:
        r = pool.map(f3, range(16))

datasets = [2.0 for _ in range(10 * 1000 * 1000)]
for f in (v1, v2, v3):
    print(f.__code__.co_name)
    for np in (2, 4, 8, 16):
        s = time()
        f(np)
        print("%s %.2fs" % (np, time()-s))

results taken on a 16 core E5-2682 VPC, it is obvious that v3 scales better: result

like image 119
georgexsh Avatar answered Oct 09 '22 01:10

georgexsh