I have a script that includes opening a file from a list and then doing something to the text within that file. I'm using python multiprocessing and Pool to try to parallelize this operation. A abstraction of the script is below:
import os
from multiprocessing import Pool
results = []
def testFunc(files):
for file in files:
print "Working in Process #%d" % (os.getpid())
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
if __name__=="__main__":
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.apply_async(testFunc, args = (files,))
results2 = results.get()
When I run this the print out of the process id is the same for each iteration. Basically what I'm trying to do is take each element of the input list and fork it out to a separate process, but it seems like one process is doing all of the work.
Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
As we have seen, the Process allocates all the tasks in memory and Pool allocates only executing processes in memory, so when the task numbers is large, we can use Pool and when the task number is small, we can use Process class.
The multiprocessing package supports spawning processes. It refers to a function that loads and executes a new child processes. For the child to terminate or to continue executing concurrent computing,then the current process hasto wait using an API, which is similar to threading module.
Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.
apply_async
farms out one task to the pool. You would need to call
apply_async
many times to exercise more processors.results
. Since the pool workers are separate processes, the two
won't be writing to the same list. One way to work around this is to use an ouput Queue. You could set it up yourself, or use apply_async
's callback to setup the Queue for you. apply_async
will call the callback once the function completes. map_async
instead of apply_async
, but then you'd
get a list of lists, which you'd then have to flatten.So, perhaps try instead something like:
import os
import multiprocessing as mp
results = []
def testFunc(file):
result = []
print "Working in Process #%d" % (os.getpid())
# This is just an illustration of some logic. This is not what I'm
# actually doing.
with open(file, 'r') as f:
for line in f:
if 'dog' in line:
result.append(line)
return result
def collect_results(result):
results.extend(result)
if __name__ == "__main__":
p = mp.Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
for f in files:
p.apply_async(testFunc, args=(f, ), callback=collect_results)
p.close()
p.join()
print(results)
Maybe in this case you should use map_async
:
import os
from multiprocessing import Pool
results = []
def testFunc(file):
message = ("Working in Process #%d" % (os.getpid()))
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
return message
if __name__=="__main__":
print("saddsf")
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.map_async(testFunc, files)
print(results.get())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With