I want to use a trie search with python 3.7 in order to match a string with some given words. The trie search algorithm is actually quite fast, however I also want to use all of the cores my CPU has. Lets assume my pc has 8 cores and I want to use 7 of them. So I split my word database into 7 equally big lists and created a trie off every one. (That's the basic Idea for parallizing the code)
However, when I call Process() off the multiprocessing module, the Process().start() method can take up a couple of seconds on the real database. (the search itself takes about a microseconds).
To be honest, I'm not yet a professional programmer, which means I probably have put in some major mistake in the code. Does someone see the reason the start of the process is so damn slow?
Please consider that I tested the script with a way bigger database than the trie below. I also tested the script with calling only 1 process each time and that was also significantly slower. I wanted to provide less code, however I think It's nice to see the running problem. I can also provide additional info if needed.
import string
import sys
import time
from multiprocessing import Process, Manager
from itertools import combinations_with_replacement
class TrieNode:
def __init__(self):
self.isString = False
self.children = {}
def insertString(self, word, root):
currentNode = root
for char in word:
if char not in currentNode.children:
currentNode.children[char] = TrieNode()
currentNode = currentNode.children[char]
currentNode.isString = True
def findStrings(self, prefix, node, results):
# Hänge das Ergebnis an, wenn ein Ende gefunden wurde
if node.isString:
results.append(prefix)
for char in node.children:
self.findStrings(prefix + char, node.children[char], results)
def findSubStrings(self, start_prefix, root, results):
currentNode = root
for char in start_prefix:
# Beende Schleife auf fehlende Prefixes oder deren Kinder
if char not in currentNode.children:
break
# Wechsle zu Kindern in anderem Falle
else:
currentNode = currentNode.children[char]
# Verwende findStrings Rekursiv zum auffinden von End-Knoten
self.findStrings(start_prefix, currentNode, results)
return results
def gen_word_list(num_words, min_word_len=4, max_word_len=10):
wordList = []
total_words = 0
for long_word in combinations_with_replacement(string.ascii_lowercase, max_word_len):
wordList.append(long_word)
total_words += 1
if total_words >= num_words:
break
for cut_length in range(1, max_word_len-min_word_len+1):
wordList.append(long_word[:-cut_length])
total_words += 1
if total_words >= num_words:
break
return wordList
if __name__ == '__main__':
# Sample word list
wordList = gen_word_list(1.5 * 10**5)
# Configs
try:
n_cores = int(sys.argv[-1] or 7)
except ValueError:
n_cores = 7
# Repetitions to do in order to estimate the runtime of a single run
num_repeats = 20
real_num_repeats = n_cores * num_repeats
# Creating Trie
root = TrieNode()
# Adding words
for word in wordList:
root.insertString(word, root)
# Extending trie to use it on multiple cores at once
multiroot = [root] * n_cores
# Measure time
print('Single process ...')
t_0 = time.time()
for i in range(real_num_repeats):
r = []
root.findSubStrings('he', root, r)
single_proc_time = (time.time()-t_0)
print(single_proc_time/real_num_repeats)
# using multicore to speed up the process
man = Manager()
# Loop to test the multicore Solution
# (Less repetitions are done to compare the timings to the single-core solution)
print('\nMultiprocess ...')
t_00 = time.time()
p_init_time = 0
procs_append_time = 0
p_start_time = 0
for i in range(num_repeats):
# Create Share-able list
res = man.list()
procs = []
for i in range(n_cores):
t_0 = time.time()
p = Process(target=multiroot[i].findSubStrings, args=('a', multiroot[i], res))
t_1 = time.time()
p_init_time += t_1 - t_0
procs.append(p)
t_2 = time.time()
procs_append_time += t_2 - t_1
p.start()
p_start_time += time.time() - t_2
for p in procs:
p.join()
multi_proc_time = time.time() - t_00
print(multi_proc_time / real_num_repeats)
init_overhead = p_init_time / single_proc_time
append_overhead = procs_append_time / single_proc_time
start_overhead = p_start_time / single_proc_time
total_overhead = (multi_proc_time - single_proc_time) / single_proc_time
print(f"Process(...) overhead: {init_overhead:.1%}")
print(f"procs.append(p) overhead: {append_overhead:.1%}")
print(f"p.start() overhead: {start_overhead:.1%}")
print(f"Total overhead: {total_overhead:.1%}")
Single process ...
0.007229958261762347
Multiprocess ...
0.7615800397736686
Process(...) overhead: 0.9%
procs.append(p) overhead: 0.0%
p.start() overhead: 8.2%
Total overhead: 10573.8%
There are many things to consider and most of them are already described in Multiprocessing > Programming guidelines. The most important thing is to remember that you are actually working with multiple processes and so there are 3 (or 4) ways of how variables are handled:
Synchronized wrappers over ctypes shared-state variables (like
multiprocessing.Value
). Actual variable is always "one object" in
memory, and wrapper by default is using "locking" to set/get real
value.
Proxies (like Manager().list()
). These variables are similar to shared-state variables, but are placed in the special "server process", and all the operations over them are actually sending pickled values between manager-process and active-process:
results.append(x)
pickles x
and sends it from manager process to active process that makes this call,
then it's unpickled
Any other access to results
(like len(results)
, iteration over results) involves the same pickling/sending/unpickling process.
So generally proxies are much slower than any other approach for common variables and in many cases using manager for "local" parallelization will give worse performance even compared to single-process runs. But a manager-server can be used remotely, so it's reasonable to use them when you want to parallelize the work using workers distributed on multiple machines
Objects available during subprocess create. For "fork" start method all the objects available during creation of subprocess are still available and "not shared", so changing them only changes it "locally for the subprocess". But before they are changed each process really "shares" the memory for each such object, so:
If they are used "read-only", then nothing is copied or "communicated".
If they are changed then they are copied inside the subprocess and, the copy is being changed. This is called Copy-On-Write or COW. Please note that making a new reference to object, e.g. assigning a variable to reference it, or appending it to a list increases ref_count of object, and that is considered to be "a change".
Behavior may also vary depending on "start method": e.g. for "spawn"/"forkserver" method changeable global variables are not really "the same objects" value seen by subprocess may not be the same as in parent process.
So initial values of multiroot[i]
(used in Process(target=..., args=(..., multiroot[i], ...))
) are shared but:
if you are not using 'fork' start method (and by default Windows is not using it), then all the args are pickled at least once for each subprocess. And so start
may be taking a long time if multiroot[i].children
is huge.
Even if you are using fork: initially multiroot[i]
seems to be shared and not copied, but I'm not sure what happens
when variables are assigned inside of findSubStrings
method (e.g. currentNode = ...
) — maybe it's causing copy-on-write (COW) and so whole instance of TrieNode
is being copied.
What can be done to improve the situation:
If you are using fork
start method, then make sure that "database" objects (TrieNode
instances) are truly
readonly and don't event have methods with variables assignments in them. For example you can move findSubStrings
to another class, and make sure to call all the instance.insertString
before starting subprocesses.
You are using man.list()
instance as a results
argument to findSubStrings
. This means that for each subprocess
a different "wrapper" is created, and all the results.append(prefix)
actions are pickling prefix
, and then sending it
to server process. If you are using Pool
with limited number of processes, then it's not a big deal. If you are spawning
huge amount of subprocesses, then it might affect performance. And I think that by default they all use "locking" so concurrent appends migth be relatively slow. If order of items in results
does not matter (I'm not experienced with prefix-trees and don't remember theory behind it), then you can fully avoid any overheads related to concurrent
results.append
:
results
list inside the findSubStrings
method. Don't use res = man.list()
at all.pool.apply_async()
);
get the results; "merge them".Using currentNode = root
in findSubStrings will result in COW of root
. That's why weak references (currentNodeRef = weakref.ref(root)
) can give a little extra benefit.
import string
import sys
import time
import weakref
from copy import deepcopy
from multiprocessing import Pool
from itertools import combinations_with_replacement
class TrieNode:
def __init__(self):
self.isString = False
self.children = {}
def insertString(self, word, root):
current_node = root
for char in word:
if char not in current_node.children:
current_node.children[char] = TrieNode()
current_node = current_node.children[char]
current_node.isString = True
# findStrings: not a method of TrieNode anymore, and works with reference to node.
def findStrings(prefix, node_ref, results):
# Hänge das Ergebnis an, wenn ein Ende gefunden wurde
if node_ref().isString:
results.append(prefix)
for char in node_ref().children:
findStrings(prefix + char, weakref.ref(node_ref().children[char]), results)
# findSubStrings: not a method of TrieNode anymore, and works with reference to node.
def findSubStrings(start_prefix, node_ref, results=None):
if results is None:
results = []
current_node_ref = node_ref
for char in start_prefix:
# Beende Schleife auf fehlende Prefixes oder deren Kinder
if char not in current_node_ref().children:
break
# Wechsle zu Kindern in anderem Falle
else:
current_node_ref = weakref.ref(current_node_ref().children[char])
# Verwende findStrings Rekursiv zum auffinden von End-Knoten
findStrings(start_prefix, current_node_ref, results)
return results
def gen_word_list(num_words, min_word_len=4, max_word_len=10):
wordList = []
total_words = 0
for long_word in combinations_with_replacement(string.ascii_lowercase, max_word_len):
wordList.append(long_word)
total_words += 1
if total_words >= num_words:
break
for cut_length in range(1, max_word_len-min_word_len+1):
wordList.append(long_word[:-cut_length])
total_words += 1
if total_words >= num_words:
break
return wordList
if __name__ == '__main__':
# Sample word list
wordList = gen_word_list(1.5 * 10**5)
# Configs
try:
n_cores = int(sys.argv[-1] or 7)
except ValueError:
n_cores = 7
# Repetitions to do in order to estimate the runtime of a single run
real_num_repeats = 420
simulated_num_repeats = real_num_repeats // n_cores
# Creating Trie
root = TrieNode()
# Adding words
for word in wordList:
root.insertString(word, root)
# Create tries for subprocesses:
multiroot = [deepcopy(root) for _ in range(n_cores)]
# NOTE: actually all subprocesses can use the same `root`, but let's copy them to simulate
# that we are using different tries when splitting job to sub-jobs
# localFindSubStrings: defined after `multiroot`, so `multiroot` can be used as "shared" variable
def localFindSubStrings(start_prefix, root_index=None, results=None):
if root_index is None:
root_ref = weakref.ref(root)
else:
root_ref = weakref.ref(multiroot[root_index])
return findSubStrings(start_prefix, root_ref, results)
# Measure time
print('Single process ...')
single_proc_num_results = None
t_0 = time.time()
for i in range(real_num_repeats):
iteration_results = localFindSubStrings('help', )
if single_proc_num_results is None:
single_proc_num_results = len(iteration_results)
single_proc_time = (time.time()-t_0)
print(single_proc_time/real_num_repeats)
# Loop to test the multicore Solution
# (Less repetitions are done to compare the timings to the single-core solution)
print('\nMultiprocess ...')
p_init_time = 0
apply_async_time = 0
results_join_time = 0
# Should processes be joined between repeats (simulate single job on multiple cores) or not (simulate multiple jobs running simultaneously)
PARALLEL_REPEATS = True
if PARALLEL_REPEATS:
t_0 = time.time()
pool = Pool(processes=n_cores)
t_1 = time.time()
p_init_time += t_1 - t_0
async_results = []
final_results = []
t_00 = time.time()
for repeat_num in range(simulated_num_repeats):
final_result = []
final_results.append(final_result)
if not PARALLEL_REPEATS:
t_0 = time.time()
pool = Pool(processes=n_cores)
t_1 = time.time()
p_init_time += t_1 - t_0
async_results = []
else:
t_1 = time.time()
async_results.append(
(
final_result,
pool.starmap_async(
localFindSubStrings,
[('help', core_num) for core_num in range(n_cores)],
)
)
)
t_2 = time.time()
apply_async_time += t_2 - t_1
if not PARALLEL_REPEATS:
for _, a_res in async_results:
for result_part in a_res.get():
t_3 = time.time()
final_result.extend(result_part)
results_join_time += time.time() - t_3
pool.close()
pool.join()
if PARALLEL_REPEATS:
for final_result, a_res in async_results:
for result_part in a_res.get():
t_3 = time.time()
final_result.extend(result_part)
results_join_time += time.time() - t_3
pool.close()
pool.join()
multi_proc_time = time.time() - t_00
# Work is not really parallelized, instead it's just 'duplicated' over cores,
# and so we divide using `real_num_repeats` (not `simulated_num_repeats`)
print(multi_proc_time / real_num_repeats)
init_overhead = p_init_time / single_proc_time
apply_async_overhead = apply_async_time / single_proc_time
results_join_percent = results_join_time / single_proc_time
total_overhead = (multi_proc_time - single_proc_time) / single_proc_time
print(f"Pool(...) overhead: {init_overhead:.1%}")
print(f"pool.starmap_async(...) overhead: {apply_async_overhead:.1%}")
print(f"Results join time percent: {results_join_percent:.1%}")
print(f"Total overhead: {total_overhead:.1%}")
for iteration_results in final_results:
num_results = len(iteration_results) / n_cores
if num_results != single_proc_num_results:
raise AssertionError(f'length of results should not change! {num_results} != {single_proc_num_results}')
NOTES:
PARALLEL_REPEATS=True
simulates running of the multiple jobs (for example each job should be started for different prefixes, but in example I use the same prefix to have consistent "load" for each run), and each job is "parallelized" over all cores. PARALLEL_REPEATS=False
simulates running of a single job
parallelized over all cores and it's slower than single-process
solution.apply_async
more than 1 time.Example output:
Single process ...
0.007109369550432477
Multiprocess ...
0.002928720201764788
Pool(...) overhead: 1.3%
pool.apply_async(...) overhead: 1.5%
Results join time percent: 1.8%
Total overhead: -58.8%
at first I want to thank everyone who was participating as every answer contributed to the solution.
As the first comments pointed out, creating a new process every time leads to python shifting the needed data into the process. This can take a couple of seconds and is leading to a non-desired delay.
What brought the ultimate solution for me is creating the processes (one per core) once using the Process class of the multiprocessing library once during the startup of the program.
You can then communicate with the process using the Pipe class of the same module.
I found the ping-pong example here really helping: https://www.youtube.com/watch?v=s1SkCYMnfbY&t=900s
It is still not optimal as multiple pipes trying to talk to the process during the same time causes the process to crash.
However, I should be able solving this issue using queues. If someone is interested in the solution feel free to ask.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With