Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing concurrency using Manager, Pool and a shared list not working

I am learning python multiprocessing, and I am trying to use this feature to populate a list with all the files present in an os. However, the code that I wrote is executing sequentially only.

#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()


def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
      for x in range(len(tld))]

for i in mp:
    i.start()
    i.join()
print len(files)

When I checked the process tree, I can see only a single chile processes spawned. (man pstree says {} denotes the child process spawned by the parent.)

---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
                               `-python(12750)`

What I was looking for was, to spawn a process for each tld directory, populate the shared list files, and that would be around 10-15 processes depending on the number of directories. What am I doing wrong?

EDIT::

I used multiprocessing.Pool to create worker threads, and this time the processes are spawned, but is giving errors when I try to usemultiprocessing.Pool.map(). I was referring to the following code in python docs that shows

from multiprocessing import Pool
def f(x):
return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3])) 

Following that example, I rewrote the code as

import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

and it is forking multiple processes.

---bash(10949)---python(12890)-+-python(12967)
                               |-python(12968)
                               |-python(12970)
                               |-python(12971)
                               |-python(12972)
                               ---snip---

But the code is erroring saying

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

What am I doing wrong here, and why do the get_files() function errors out?

like image 423
nohup Avatar asked Oct 08 '15 10:10

nohup


2 Answers

It's simply because you instantiate your pool before defining the function get_files :

import os
import multiprocessing

tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()

files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here

pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

The overall idea of a process is that at the instant you start it, you fork the memory of the main process. So any definition done in the main process after the fork will not be in the subprocess.

If you want a shared memory, you can use the threading library, but you will have some issues with it (cf: The global interpreter lock)

like image 189
FunkySayu Avatar answered Oct 14 '22 03:10

FunkySayu


I ran across this and tried the accepted answer on Python 3.x, it doesn't work for a couple of reasons. Here then is a modified version that does work (as of this writing on Python 3.10.1):

import multiprocessing
import os


def get_files(x, files_):
    proc = multiprocessing.Process()
    for root, dir, file in os.walk(x):
        for name in file:
            full_path = os.path.join(root, name)
            # print(filename"worker:{proc.name} path:{full_path}")
            files_.append(full_path)


if __name__ == '__main__':
    # See https://docs.python.org/3/library/multiprocessing.html
    with multiprocessing.Manager() as manager:
        # The code will count the number of result_files under the specified root:
        root = '/'

        # Create the top level list of folders which will be walked (and result_files counted)
        tld = [os.path.join(os.pathsep, root, filename) for filename in next(os.walk(root))[1]]

        # Creates result list object in the manager, which is passed to the workers to collect results into.
        result_files = manager.list()

        # Create a pool of workers, with the size being equal to the number of top level folders:
        pool = multiprocessing.Pool(processes=len(tld))

        # Use starmap() instead of map() to allow passing multiple arguments (e.g. the folder and the result_files list).
        pool.starmap(get_files, [(folder, result_files) for folder in tld])

        pool.close()
        pool.join()

        # The result, the count of the number of result_files.
        print(len(result_files))
like image 28
W.Prins Avatar answered Oct 14 '22 03:10

W.Prins