I am having performance issues with my code.
step # IIII
consumes hours of time. I used to materialize the
the itertools.prodct before, but thanks to a user I dont do pro_data = product(array_b,array_a)
anymore. This helped me with memory issues, but the still is heavily time consuming.
I would like to paralellize it with multithreading or multiprocesisng, whatever you can suggest, I am grateful.
Explanation. I have two arrays that contain x and y values of particles. For each particle (defined by two coordinates) I want to calculate a function with another. For combinations I use the itertools.product method and loop over every particle. I run over 50000 particels in total, so I have N*N/2 combinations to calculate.
Thanks in advance
import numpy as np
import matplotlib.pyplot as plt
from itertools import product,combinations_with_replacement
def func(ar1,ar2,ar3,ar4): #example func that takes four arguments
return (ar1*ar2**22+np.sin(ar3)+ar4)
def newdist(a):
return func(a[0][0],a[0][1],a[1][0],a[1][1])
x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram
x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2)
x_width=x_edges[1:]-x_edges[:-1]
hist_data=np.zeros([len(x_edges)-1])
array1=np.random.uniform(0.,10.,100)
array2=np.random.uniform(0.,10.,100)
array_a = np.dstack((array1,array1))[0]
array_b = np.dstack((array2,array2))[0]
# IIII
for i in product(array_a,array_b):
(result,bins) = np.histogram(newdist(i),bins=x_edges)
hist_data+=result
hist_data = np.array(map(float, hist_data))
plt.bar(x_mean,hist_data,width=x_width,color='r')
plt.show()
-----EDIT----- I used this code now:
def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
def worker(array_ab, out_q):
""" push result in queue """
outdict = {}
outdict = vec_chunk(array_ab, bins)
out_q.put(outdict)
out_q = mp.Queue()
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
array_size_a=len(array_a)-(len(array_a)%d)
array_size_b=len(array_b)-(len(array_b)%d)
a_chunk = array_size_a / d
b_chunk = array_size_b / d
procs = []
#prepare arrays for mp
array_ab = np.empty((4, a_chunk, b_chunk))
for j in xrange(d):
for k in xrange(d):
array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
p = mp.Process(target=worker, args=(array_ab, out_q))
procs.append(p)
p.start()
resultarray = np.empty(len(bins)-1)
for i in range(d):
resultarray+=out_q.get()
# Wait for all worker processes to finish
for pro in procs:
pro.join()
print resultarray
return resultarray
Problem here is that I cannot control the numbers of processes. How Can I use a mp.Pool()
instead?
than
First, lets look at a straightforward vectorization of your problem. I have a feeling that you want your array_a
and array_b
to be the exact same, i.e. the coordinates of the particles, but I am keeping them separate here.
I have turned your code into a function, to make timing easier:
def IIII(array_a, array_b, bins) :
hist_data=np.zeros([len(bins)-1])
for i in product(array_a,array_b):
(result,bins) = np.histogram(newdist(i), bins=bins)
hist_data+=result
hist_data = np.array(map(float, hist_data))
return hist_data
You can, by the way, generate your sample data in a less convoluted way as follows:
n = 100
array_a = np.random.uniform(0, 10, size=(n, 2))
array_b = np.random.uniform(0, 10, size=(n, 2))
So first we need to vectorize your func
. I have done it so it can take any array
of shape (4, ...)
. To spare memory, it is doing the calculation in place, and returning the first plane, i.e. array[0]
.
def func_vectorized(a) :
a[1] **= 22
np.sin(a[2], out=a[2])
a[0] *= a[1]
a[0] += a[2]
a[0] += a[3]
return a[0]
With this function in place, we can write a vectorized version of IIII
:
def IIII_vec(array_a, array_b, bins) :
array_ab = np.empty((4, len(array_a), len(array_b)))
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
array_ab[[0, 1]] = a[:, :, None]
array_ab[[2, 3]] = b[:, None, :]
newdist = func_vectorized(array_ab)
hist, _ = np.histogram(newdist, bins=bins)
return hist
With n = 100
points, they both return the same:
In [2]: h1 = IIII(array_a, array_b, x_edges)
In [3]: h2 = IIII_bis(array_a, array_b, x_edges)
In [4]: np.testing.assert_almost_equal(h1, h2)
But the timing differences are already very relevant:
In [5]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 654 ms per loop
In [6]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.08 ms per loop
A 300x speedup!. If you try it again with longer sample data, n = 1000
, you can see that they both scale equally bad, as n**2
, so the 300x stays there:
In [10]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 68.2 s per loop
In [11]: %timeit IIII_bis(array_a, array_b, x_edges)
1 loops, best of 3: 229 ms per loop
So you are still looking at a good 10 min. of processing, which is not really that much when compared to the more than 2 days that your current solution would require.
Of course, for things to be so nice, you will need to fit a (4, 50000, 50000)
array of floats into memory, something that my system cannot handle. But you can still keep things relatively fast, by processing it in chunks. The following version of IIII_vec
divides each array into d
chunks. As written, the length of the array should be divisible by d
. It wouldn't bee too hard to overcome that limitation, but it would obfuscate the true purpose:
def IIII_vec_bis(array_a, array_b, bins, d=1) :
a = np.swapaxes(array_a, 0 ,1)
b = np.swapaxes(array_b, 0 ,1)
a_chunk = len(array_a) // d
b_chunk = len(array_b) // d
array_ab = np.empty((4, a_chunk, b_chunk))
hist_data = np.zeros((len(bins) - 1,))
for j in xrange(d) :
for k in xrange(d) :
array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
newdist = func_vectorized(array_ab)
hist, _ = np.histogram(newdist, bins=bins)
hist_data += hist
return hist_data
First, lets check that it really works:
In [4]: h1 = IIII_vec(array_a, array_b, x_edges)
In [5]: h2 = IIII_vec_bis(array_a, array_b, x_edges, d=10)
In [6]: np.testing.assert_almost_equal(h1, h2)
And now some timings. With n = 100
:
In [7]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.02 ms per loop
In [8]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
100 loops, best of 3: 12 ms per loop
But as you start having to have a larger and larger array in memory, doing it in chunks starts to pay off. With n = 1000
:
In [12]: %timeit IIII_vec(array_a, array_b, x_edges)
1 loops, best of 3: 223 ms per loop
In [13]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 208 ms per loop
With n = 10000
I can no longer call IIII_vec
without an array is too big error, but the chunky version is still running:
In [18]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 21.8 s per loop
And just to show that it can be done, I have run it once with n = 50000
:
In [23]: %timeit -n1 -r1 IIII_vec_bis(array_a, array_b, x_edges, d=50)
1 loops, best of 1: 543 s per loop
So a good 9 minutes of number crunching, which is not all that bad given it has computed 2.5 billion interactions.
Use vectorized numpy operations. Replace the for-loop over product()
with a single newdist()
call by creating arguments using meshgrid()
.
To parallize the problem compute newdist()
on slices of array_a
, array_b
that correspond to subblocks of meshgrid()
. Here's an example using slices and multiprocessing.
Here's another example to demonstrate the steps: python loop -> vectorized numpy version -> parallel:
#!/usr/bin/env python
from __future__ import division
import math
import multiprocessing as mp
import numpy as np
try:
from itertools import izip as zip
except ImportError:
zip = zip # Python 3
def pi_loop(x, y, npoints):
"""Compute pi using Monte-Carlo method."""
# note: the method converges to pi very slowly.
return 4 * sum(1 for xx, yy in zip(x, y) if (xx**2 + yy**2) < 1) / npoints
def pi_vectorized(x, y, npoints):
return 4 * ((x**2 + y**2) < 1).sum() / npoints # or just .mean()
def mp_init(x_shared, y_shared):
global mp_x, mp_y
mp_x, mp_y = map(np.frombuffer, [x_shared, y_shared]) # no copy
def mp_pi(args):
# perform computations on slices of mp_x, mp_y
start, end = args
x = mp_x[start:end] # no copy
y = mp_y[start:end]
return ((x**2 + y**2) < 1).sum()
def pi_parallel(x, y, npoints):
# compute pi using multiple processes
pool = mp.Pool(initializer=mp_init, initargs=[x, y])
step = 100000
slices = ((start, start + step) for start in range(0, npoints, step))
return 4 * sum(pool.imap_unordered(mp_pi, slices)) / npoints
def main():
npoints = 1000000
# create shared arrays
x_sh, y_sh = [mp.RawArray('d', npoints) for _ in range(2)]
# initialize arrays
x, y = map(np.frombuffer, [x_sh, y_sh])
x[:] = np.random.uniform(size=npoints)
y[:] = np.random.uniform(size=npoints)
for f, a, b in [(pi_loop, x, y),
(pi_vectorized, x, y),
(pi_parallel, x_sh, y_sh)]:
pi = f(a, b, npoints)
precision = int(math.floor(math.log10(npoints)) / 2 - 1 + 0.5)
print("%.*f %.1e" % (precision + 1, pi, abs(pi - math.pi)))
if __name__=="__main__":
main()
Time performance for npoints = 10_000_000
:
pi_loop pi_vectorized pi_parallel
32.6 0.159 0.069 # seconds
It shows that the main performance benefit is from converting the python loop to its vectorized numpy analog.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With