I am learning python multiprocessing, and I am trying to use this feature to populate a list with all the files present in an os. However, the code that I wrote is executing sequentially only.
#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
for x in range(len(tld))]
for i in mp:
i.start()
i.join()
print len(files)
When I checked the process tree, I can see only a single chile processes spawned. (man pstree says {} denotes the child process spawned by the parent.)
---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
`-python(12750)`
What I was looking for was, to spawn a process for each tld directory, populate the shared list files
, and that would be around 10-15 processes depending on the number of directories. What am I doing wrong?
EDIT::
I used multiprocessing.Pool
to create worker threads, and this time the
processes are spawned, but is giving errors when I try to usemultiprocessing.Pool.map()
. I was referring to the following code in python docs that shows
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
Following that example, I rewrote the code as
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
and it is forking multiple processes.
---bash(10949)---python(12890)-+-python(12967)
|-python(12968)
|-python(12970)
|-python(12971)
|-python(12972)
---snip---
But the code is erroring saying
Process PoolWorker-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
AttributeError: 'module' object has no attribute 'get_files'
self._target(*self._args, **self._kwargs)
self.run()
task = get()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
AttributeError: 'module' object has no attribute 'get_files'
self.run()
What am I doing wrong here, and why do the get_files() function errors out?
It's simply because you instantiate your pool before defining the function get_files
:
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
files = manager.list()
def get_files(x):
for root, dir, file in os.walk(x):
for name in file:
files.append(os.path.join(root, name))
pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)
The overall idea of a process is that at the instant you start it, you fork the memory of the main process. So any definition done in the main process after the fork will not be in the subprocess.
If you want a shared memory, you can use the threading
library, but you will have some issues with it (cf: The global interpreter lock)
I ran across this and tried the accepted answer on Python 3.x, it doesn't work for a couple of reasons. Here then is a modified version that does work (as of this writing on Python 3.10.1):
import multiprocessing
import os
def get_files(x, files_):
proc = multiprocessing.Process()
for root, dir, file in os.walk(x):
for name in file:
full_path = os.path.join(root, name)
# print(filename"worker:{proc.name} path:{full_path}")
files_.append(full_path)
if __name__ == '__main__':
# See https://docs.python.org/3/library/multiprocessing.html
with multiprocessing.Manager() as manager:
# The code will count the number of result_files under the specified root:
root = '/'
# Create the top level list of folders which will be walked (and result_files counted)
tld = [os.path.join(os.pathsep, root, filename) for filename in next(os.walk(root))[1]]
# Creates result list object in the manager, which is passed to the workers to collect results into.
result_files = manager.list()
# Create a pool of workers, with the size being equal to the number of top level folders:
pool = multiprocessing.Pool(processes=len(tld))
# Use starmap() instead of map() to allow passing multiple arguments (e.g. the folder and the result_files list).
pool.starmap(get_files, [(folder, result_files) for folder in tld])
pool.close()
pool.join()
# The result, the count of the number of result_files.
print(len(result_files))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With