Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing apply_async only uses one process

I have a script that includes opening a file from a list and then doing something to the text within that file. I'm using python multiprocessing and Pool to try to parallelize this operation. A abstraction of the script is below:

import os
from multiprocessing import Pool

results = []
def testFunc(files):
    for file in files:
        print "Working in Process #%d" % (os.getpid())
        #This is just an illustration of some logic. This is not what I'm actually doing.
        for line in file:
            if 'dog' in line:
                results.append(line)

if __name__=="__main__":
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.apply_async(testFunc, args = (files,))
    results2 = results.get()

When I run this the print out of the process id is the same for each iteration. Basically what I'm trying to do is take each element of the input list and fork it out to a separate process, but it seems like one process is doing all of the work.

like image 321
user1074057 Avatar asked Sep 18 '12 19:09

user1074057


People also ask

Is Python a single process?

Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.

What is the difference between pool and process in multiprocessing?

As we have seen, the Process allocates all the tasks in memory and Pool allocates only executing processes in memory, so when the task numbers is large, we can use Pool and when the task number is small, we can use Process class.

How does Python multiprocessing work?

The multiprocessing package supports spawning processes. It refers to a function that loads and executes a new child processes. For the child to terminate or to continue executing concurrent computing,then the current process hasto wait using an API, which is similar to threading module.

How does Python multiprocessing queue work?

Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.


2 Answers

  • apply_async farms out one task to the pool. You would need to call apply_async many times to exercise more processors.
  • Don't allow both processes to try to write to the same list, results. Since the pool workers are separate processes, the two won't be writing to the same list. One way to work around this is to use an ouput Queue. You could set it up yourself, or use apply_async's callback to setup the Queue for you. apply_async will call the callback once the function completes.
  • You could use map_async instead of apply_async, but then you'd get a list of lists, which you'd then have to flatten.

So, perhaps try instead something like:

import os
import multiprocessing as mp

results = []   

def testFunc(file):
    result = []
    print "Working in Process #%d" % (os.getpid())
    # This is just an illustration of some logic. This is not what I'm
    # actually doing.
    with open(file, 'r') as f:
        for line in f:
            if 'dog' in line:
                result.append(line)
    return result


def collect_results(result):
    results.extend(result)

if __name__ == "__main__":
    p = mp.Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    for f in files:
        p.apply_async(testFunc, args=(f, ), callback=collect_results)
    p.close()
    p.join()
    print(results)
like image 193
unutbu Avatar answered Sep 19 '22 23:09

unutbu


Maybe in this case you should use map_async:

import os
from multiprocessing import Pool

results = []
def testFunc(file):
    message =  ("Working in Process #%d" % (os.getpid()))
    #This is just an illustration of some logic. This is not what I'm actually doing.
    for line in file:
        if 'dog' in line:
            results.append(line)
    return message

if __name__=="__main__":
    print("saddsf")
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.map_async(testFunc, files)
    print(results.get())
like image 41
Odomontois Avatar answered Sep 18 '22 23:09

Odomontois