I am in the process of migrating from MATLAB to Python, mainly because of the vast number of interesting Machine Learning packages available in Python. But one of the issues which have been the source of confusion for me, is parallel processing. In particular, I want to read thousand of text files from disk in a for
loop and I want to do it in parallel. In MATLAB, using parfor
instead of for
will do the trick, but so far I haven't been able to figure out how to do this in python.
Here is an example of what I want to do. I want to read N text files, shape them into a N1xN2 array, and save each one into a a NxN1xN2 numpy array. And this array will be what I return from a function. Assuming the file names are file0001.dat
, file0002.dat
, etc., the code I like to parallelise is as follows:
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
for counter in range(N):
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
result[counter,:,:]=temp_array
I run the codes on a cluster, so I can use many processors for the job. Hence, any comment on which of the parallelisation methods is more suitable for my task (if there are more than one) is most welcome.
NOTE: I am aware of this post, but in that post, there are only out1
, out2
, out3
variables to worry about, and they have been used explicitly as arguments of a function to be parallelised. But here, I have many 2D arrays that should be read from file and saved into a 3D array. So, the answer to that question is not general enough for my case (or that is how I understood it).
Steps used to open multiple files together in Python : Both the files are opened with open() method using different names for each. The contents of the files can be accessed using readline() method. Different read/write operations can be performed over the contents of these files.
Multiprocessing in Python enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. Multiprocessing enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel.
One of Python's main weaknesses is its inability to have true parallelization due to the Global Interpreter Lock. However, some functions can still virtually run in parallel. Python allows this with two different concepts: multithreading and multiprocessing.
You still probably want to use multiprocessing, just structure it a bit differently:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
filenames = ('file_%.4d.dat' % i for i in range(N))
myshaper = lambda fname: np.loadtxt(fname).reshape([N1, nN2])
pool = Pool()
for i, temparray in enumerate(pool.imap(myshaper, filenames)):
result[i, :, :] = temp_array
pool.close()
pool.join()
What this does is first get a generator for the file names in filenames
. This means the file names are not stored in memory, but you can still loop over them. Next, it create a lambda function (equivalent to anonymous functions in matlab) that loads and reshapes a file (you could also use an ordinary function). Then it applies that function to each file name in using multiple processes, and puts the result in the overall array. Then it closes the processes.
This version uses some more idiomatic python. However, an approach that is more similar to your original one (although less idiomatic) might help you understand a bit better:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
def proccounter(counter):
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
return counter, temp_array
pool = Pool()
for counter, temp_array in pool.imap(proccounter, range(N)):
result[counter,:,:] = temp_array
pool.close()
pool.join()
This just splits most of your for
loop into a function, applies that function to each element of the range using multiple processors, then puts the result into the array. It is basically just your original function with the for
loop split into two for
loops.
It can be done using joblib
library as follows:
def par_func(N1, N2, counter):
import numpy as np
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
# temp_array = np.random.randn(N1, N2) # use this line to test
return temp_array
if __name__ == '__main__':
import numpy as np
N=1000
N1=200
N2=100
from joblib import Parallel, delayed
num_jobs = 2
output_list = Parallel(n_jobs=num_jobs)(delayed(par_func)
(N1, N2, counter)
for counter in range(N))
output_array = np.array(output_list)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With