Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Overhead of python multiprocessing initialization is worse than benefits

I want to use a trie search with python 3.7 in order to match a string with some given words. The trie search algorithm is actually quite fast, however I also want to use all of the cores my CPU has. Lets assume my pc has 8 cores and I want to use 7 of them. So I split my word database into 7 equally big lists and created a trie off every one. (That's the basic Idea for parallizing the code)

However, when I call Process() off the multiprocessing module, the Process().start() method can take up a couple of seconds on the real database. (the search itself takes about a microseconds).

To be honest, I'm not yet a professional programmer, which means I probably have put in some major mistake in the code. Does someone see the reason the start of the process is so damn slow?

Please consider that I tested the script with a way bigger database than the trie below. I also tested the script with calling only 1 process each time and that was also significantly slower. I wanted to provide less code, however I think It's nice to see the running problem. I can also provide additional info if needed.

import string
import sys
import time

from multiprocessing import Process, Manager
from itertools import combinations_with_replacement


class TrieNode:

    def __init__(self):

        self.isString = False
        self.children = {}

    def insertString(self, word, root):
        currentNode = root
        for char in word:
            if char not in currentNode.children:
                currentNode.children[char] = TrieNode()
            currentNode = currentNode.children[char]
        currentNode.isString = True

    def findStrings(self, prefix, node, results):
        # Hänge das Ergebnis an, wenn ein Ende gefunden wurde
        if node.isString:
            results.append(prefix)

        for char in node.children:
            self.findStrings(prefix + char, node.children[char], results)

    def findSubStrings(self, start_prefix, root, results):
        currentNode = root
        for char in start_prefix:

            # Beende Schleife auf fehlende Prefixes oder deren Kinder
            if char not in currentNode.children:
                break
            # Wechsle zu Kindern in anderem Falle
            else:
                currentNode = currentNode.children[char]

        # Verwende findStrings Rekursiv zum auffinden von End-Knoten
        self.findStrings(start_prefix, currentNode, results)

        return results


def gen_word_list(num_words, min_word_len=4, max_word_len=10):
    wordList = []
    total_words = 0
    for long_word in combinations_with_replacement(string.ascii_lowercase, max_word_len):
        wordList.append(long_word)
        total_words += 1

        if total_words >= num_words:
            break

        for cut_length in range(1, max_word_len-min_word_len+1):
            wordList.append(long_word[:-cut_length])
            total_words += 1
            if total_words >= num_words:
                break

    return wordList


if __name__ == '__main__':
    # Sample word list

    wordList = gen_word_list(1.5 * 10**5)

    # Configs
    try:
        n_cores = int(sys.argv[-1] or 7)
    except ValueError:
        n_cores = 7

    # Repetitions to do in order to estimate the runtime of a single run
    num_repeats = 20
    real_num_repeats = n_cores * num_repeats

    # Creating Trie
    root = TrieNode()

    # Adding words
    for word in wordList:
        root.insertString(word, root)

    # Extending trie to use it on multiple cores at once
    multiroot = [root] * n_cores

    # Measure time
    print('Single process ...')
    t_0 = time.time()
    for i in range(real_num_repeats):
        r = []
        root.findSubStrings('he', root, r)
    single_proc_time = (time.time()-t_0)
    print(single_proc_time/real_num_repeats)

    # using multicore to speed up the process
    man = Manager()

    # Loop to test the multicore Solution
    # (Less repetitions are done to compare the timings to the single-core solution)
    print('\nMultiprocess ...')
    t_00 = time.time()
    p_init_time = 0
    procs_append_time = 0
    p_start_time = 0
    for i in range(num_repeats):

        # Create Share-able list
        res = man.list()

        procs = []

        for i in range(n_cores):
            t_0 = time.time()
            p = Process(target=multiroot[i].findSubStrings, args=('a', multiroot[i], res))
            t_1 = time.time()
            p_init_time += t_1 - t_0
            procs.append(p)
            t_2 = time.time()
            procs_append_time += t_2 - t_1
            p.start()
            p_start_time += time.time() - t_2

        for p in procs:
            p.join()
    multi_proc_time = time.time() - t_00
    print(multi_proc_time / real_num_repeats)
    init_overhead = p_init_time / single_proc_time
    append_overhead = procs_append_time / single_proc_time
    start_overhead = p_start_time / single_proc_time
    total_overhead = (multi_proc_time - single_proc_time) / single_proc_time
    print(f"Process(...) overhead: {init_overhead:.1%}")
    print(f"procs.append(p) overhead: {append_overhead:.1%}")
    print(f"p.start() overhead: {start_overhead:.1%}")
    print(f"Total overhead: {total_overhead:.1%}")


Single process ...
0.007229958261762347

Multiprocess ...
0.7615800397736686
Process(...) overhead: 0.9%
procs.append(p) overhead: 0.0%
p.start() overhead: 8.2%
Total overhead: 10573.8%

like image 848
Dennis Avatar asked May 11 '19 22:05

Dennis


2 Answers

General idea

There are many things to consider and most of them are already described in Multiprocessing > Programming guidelines. The most important thing is to remember that you are actually working with multiple processes and so there are 3 (or 4) ways of how variables are handled:

  • Synchronized wrappers over ctypes shared-state variables (like multiprocessing.Value). Actual variable is always "one object" in memory, and wrapper by default is using "locking" to set/get real value.

  • Proxies (like Manager().list()). These variables are similar to shared-state variables, but are placed in the special "server process", and all the operations over them are actually sending pickled values between manager-process and active-process:

    • results.append(x) pickles x and sends it from manager process to active process that makes this call, then it's unpickled

    • Any other access to results (like len(results), iteration over results) involves the same pickling/sending/unpickling process.

    So generally proxies are much slower than any other approach for common variables and in many cases using manager for "local" parallelization will give worse performance even compared to single-process runs. But a manager-server can be used remotely, so it's reasonable to use them when you want to parallelize the work using workers distributed on multiple machines

  • Objects available during subprocess create. For "fork" start method all the objects available during creation of subprocess are still available and "not shared", so changing them only changes it "locally for the subprocess". But before they are changed each process really "shares" the memory for each such object, so:

    • If they are used "read-only", then nothing is copied or "communicated".

    • If they are changed then they are copied inside the subprocess and, the copy is being changed. This is called Copy-On-Write or COW. Please note that making a new reference to object, e.g. assigning a variable to reference it, or appending it to a list increases ref_count of object, and that is considered to be "a change".

Behavior may also vary depending on "start method": e.g. for "spawn"/"forkserver" method changeable global variables are not really "the same objects" value seen by subprocess may not be the same as in parent process.

So initial values of multiroot[i] (used in Process(target=..., args=(..., multiroot[i], ...))) are shared but:

  • if you are not using 'fork' start method (and by default Windows is not using it), then all the args are pickled at least once for each subprocess. And so start may be taking a long time if multiroot[i].children is huge.

  • Even if you are using fork: initially multiroot[i] seems to be shared and not copied, but I'm not sure what happens when variables are assigned inside of findSubStrings method (e.g. currentNode = ...) — maybe it's causing copy-on-write (COW) and so whole instance of TrieNode is being copied.

What can be done to improve the situation:

  • If you are using fork start method, then make sure that "database" objects (TrieNode instances) are truly readonly and don't event have methods with variables assignments in them. For example you can move findSubStrings to another class, and make sure to call all the instance.insertString before starting subprocesses.

  • You are using man.list() instance as a results argument to findSubStrings. This means that for each subprocess a different "wrapper" is created, and all the results.append(prefix) actions are pickling prefix, and then sending it to server process. If you are using Pool with limited number of processes, then it's not a big deal. If you are spawning huge amount of subprocesses, then it might affect performance. And I think that by default they all use "locking" so concurrent appends migth be relatively slow. If order of items in results does not matter (I'm not experienced with prefix-trees and don't remember theory behind it), then you can fully avoid any overheads related to concurrent results.append:

    • create new results list inside the findSubStrings method. Don't use res = man.list() at all.
    • To get the "final" results: iterate over every result object returned by pool.apply_async()); get the results; "merge them".

Using weak references

Using currentNode = root in findSubStrings will result in COW of root. That's why weak references (currentNodeRef = weakref.ref(root)) can give a little extra benefit.

Example

import string
import sys
import time
import weakref

from copy import deepcopy
from multiprocessing import Pool
from itertools import combinations_with_replacement


class TrieNode:

    def __init__(self):

        self.isString = False
        self.children = {}

    def insertString(self, word, root):
        current_node = root
        for char in word:
            if char not in current_node.children:
                current_node.children[char] = TrieNode()
            current_node = current_node.children[char]
        current_node.isString = True


# findStrings: not a method of TrieNode anymore, and works with reference to node.
def findStrings(prefix, node_ref, results):
    # Hänge das Ergebnis an, wenn ein Ende gefunden wurde
    if node_ref().isString:
        results.append(prefix)

    for char in node_ref().children:
        findStrings(prefix + char, weakref.ref(node_ref().children[char]), results)


# findSubStrings: not a method of TrieNode anymore, and works with reference to node.
def findSubStrings(start_prefix, node_ref, results=None):

    if results is None:
        results = []
    current_node_ref = node_ref

    for char in start_prefix:

        # Beende Schleife auf fehlende Prefixes oder deren Kinder
        if char not in current_node_ref().children:
            break
        # Wechsle zu Kindern in anderem Falle
        else:
            current_node_ref = weakref.ref(current_node_ref().children[char])

    # Verwende findStrings Rekursiv zum auffinden von End-Knoten
    findStrings(start_prefix, current_node_ref, results)

    return results


def gen_word_list(num_words, min_word_len=4, max_word_len=10):
    wordList = []
    total_words = 0
    for long_word in combinations_with_replacement(string.ascii_lowercase, max_word_len):
        wordList.append(long_word)
        total_words += 1

        if total_words >= num_words:
            break

        for cut_length in range(1, max_word_len-min_word_len+1):
            wordList.append(long_word[:-cut_length])
            total_words += 1
            if total_words >= num_words:
                break

    return wordList


if __name__ == '__main__':
    # Sample word list

    wordList = gen_word_list(1.5 * 10**5)

    # Configs
    try:
        n_cores = int(sys.argv[-1] or 7)
    except ValueError:
        n_cores = 7

    # Repetitions to do in order to estimate the runtime of a single run
    real_num_repeats = 420
    simulated_num_repeats = real_num_repeats // n_cores

    # Creating Trie
    root = TrieNode()

    # Adding words
    for word in wordList:
        root.insertString(word, root)

    # Create tries for subprocesses:
    multiroot = [deepcopy(root) for _ in range(n_cores)]
    # NOTE: actually all subprocesses can use the same `root`, but let's copy them to simulate
    # that we are using different tries when splitting job to sub-jobs

    # localFindSubStrings: defined after `multiroot`, so `multiroot` can be used as "shared" variable
    def localFindSubStrings(start_prefix, root_index=None, results=None):
        if root_index is None:
            root_ref = weakref.ref(root)
        else:
            root_ref = weakref.ref(multiroot[root_index])
        return findSubStrings(start_prefix, root_ref, results)

    # Measure time
    print('Single process ...')
    single_proc_num_results = None
    t_0 = time.time()
    for i in range(real_num_repeats):
        iteration_results = localFindSubStrings('help', )
        if single_proc_num_results is None:
            single_proc_num_results = len(iteration_results)
    single_proc_time = (time.time()-t_0)
    print(single_proc_time/real_num_repeats)

    # Loop to test the multicore Solution
    # (Less repetitions are done to compare the timings to the single-core solution)
    print('\nMultiprocess ...')
    p_init_time = 0
    apply_async_time = 0
    results_join_time = 0

    # Should processes be joined between repeats (simulate single job on multiple cores) or not (simulate multiple jobs running simultaneously)
    PARALLEL_REPEATS = True

    if PARALLEL_REPEATS:
        t_0 = time.time()
        pool = Pool(processes=n_cores)
        t_1 = time.time()
        p_init_time += t_1 - t_0
        async_results = []

    final_results = []

    t_00 = time.time()

    for repeat_num in range(simulated_num_repeats):

        final_result = []
        final_results.append(final_result)

        if not PARALLEL_REPEATS:
            t_0 = time.time()
            pool = Pool(processes=n_cores)
            t_1 = time.time()
            p_init_time += t_1 - t_0
            async_results = []
        else:
            t_1 = time.time()

        async_results.append(
            (
                final_result,
                pool.starmap_async(
                    localFindSubStrings,
                    [('help', core_num) for core_num in range(n_cores)],
                )
            )
        )
        t_2 = time.time()
        apply_async_time += t_2 - t_1

        if not PARALLEL_REPEATS:
            for _, a_res in async_results:
                for result_part in a_res.get():
                    t_3 = time.time()
                    final_result.extend(result_part)
                    results_join_time += time.time() - t_3
            pool.close()
            pool.join()

    if PARALLEL_REPEATS:
        for final_result, a_res in async_results:
            for result_part in a_res.get():
                t_3 = time.time()
                final_result.extend(result_part)
                results_join_time += time.time() - t_3
        pool.close()
        pool.join()

    multi_proc_time = time.time() - t_00

    # Work is not really parallelized, instead it's just 'duplicated' over cores,
    # and so we divide using `real_num_repeats` (not `simulated_num_repeats`)
    print(multi_proc_time / real_num_repeats)
    init_overhead = p_init_time / single_proc_time
    apply_async_overhead = apply_async_time / single_proc_time
    results_join_percent = results_join_time / single_proc_time
    total_overhead = (multi_proc_time - single_proc_time) / single_proc_time
    print(f"Pool(...) overhead: {init_overhead:.1%}")
    print(f"pool.starmap_async(...) overhead: {apply_async_overhead:.1%}")
    print(f"Results join time percent: {results_join_percent:.1%}")
    print(f"Total overhead: {total_overhead:.1%}")

    for iteration_results in final_results:
        num_results = len(iteration_results) / n_cores
        if num_results != single_proc_num_results:
            raise AssertionError(f'length of results should not change! {num_results} != {single_proc_num_results}')

NOTES:

  • PARALLEL_REPEATS=True simulates running of the multiple jobs (for example each job should be started for different prefixes, but in example I use the same prefix to have consistent "load" for each run), and each job is "parallelized" over all cores.
  • PARALLEL_REPEATS=False simulates running of a single job parallelized over all cores and it's slower than single-process solution.
  • It seems that parallelism is only better when each worker in the pool is issued apply_async more than 1 time.

Example output:

Single process ...
0.007109369550432477

Multiprocess ...
0.002928720201764788
Pool(...) overhead: 1.3%
pool.apply_async(...) overhead: 1.5%
Results join time percent: 1.8%
Total overhead: -58.8%

like image 59
imposeren Avatar answered Sep 28 '22 07:09

imposeren


at first I want to thank everyone who was participating as every answer contributed to the solution.

As the first comments pointed out, creating a new process every time leads to python shifting the needed data into the process. This can take a couple of seconds and is leading to a non-desired delay.

What brought the ultimate solution for me is creating the processes (one per core) once using the Process class of the multiprocessing library once during the startup of the program.

You can then communicate with the process using the Pipe class of the same module.

I found the ping-pong example here really helping: https://www.youtube.com/watch?v=s1SkCYMnfbY&t=900s

It is still not optimal as multiple pipes trying to talk to the process during the same time causes the process to crash.

However, I should be able solving this issue using queues. If someone is interested in the solution feel free to ask.

like image 45
Dennis Avatar answered Sep 28 '22 06:09

Dennis