I have the following problem.
My purpose is to process a bunch of documents (bring all words to normal form, e.g. 'was' --> 'be', 'were' --> 'be', 'went' --> 'go').
Which means, I need to open each file in a directory, change its content and save it in the other directory.
Since the process is time-consuming, I decided to parallel it with the help of joblib.
The code below works properly (I mean, it performs what it has to), but I faced a huge problem with memory.
It keeps growing constantly!
It grows until there's no memory left on the server at all.
from joblib import delayed, Parallel
def process_text(text):
# some function which processes
# text and returns a new text
return processed_text
def process_and_save(document_id):
with open(path + document_id) as f:
text = f.read()
text = process_text(text)
f = open(other_path + document_id, 'w')
f.write(text)
f.close()
all_doc_ids = # a list of document ids which I need to process
Parallel(n_jobs=10)(delayed(process_and_save)(doc_id) for doc_id in all_doc_ids)
I've also tried to change joblib into multipricessing:
pool = Pool(10)
pool.map(process_and_save, all_doc_ids)
But the situation turned out to be exactly the same.
Are there any ways to solve the problem? And, of course, the main question is, why is this even happening?
Thank you!
P.S. The documents are quite small and the process consumes very little memory when running without parallelism.
The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax. Under Windows, the use of multiprocessing. Pool requires to protect the main loop of code to avoid recursive spawning of subprocesses when using joblib.
Joblib provides three different backends: loky (default), threading, and multiprocessing.
TL;DR - it preserves order for both backends. Extending @Chris Farr's answer, I implemented a simple test. I make a function wait for some random amount of time (you can check these wait times are not identical). I get that the order is preserved every time, with both backends.
Parameters n_jobs: int, default: NoneThe maximum number of concurrently running jobs, such as the number of Python worker processes when backend=”multiprocessing” or the size of the thread-pool when backend=”threading”.
It seem this memory leak issue has been resolved on the last version of Joblib.
They introduce loky backend as memory leaks safeguards.
Parallel(n_jobs=10, backend='loky')(delayed(process_and_save)(doc_id) for doc_id in all_doc_ids)
source: Memory Release after parallel
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With