Here's a MWE
of a much larger code I'm using. Basically, it performs a Monte Carlo integration over a KDE (kernel density estimate) for all values located below a certain threshold (the integration method was suggested over at this question BTW: Integrate 2D kernel density estimate).
import numpy as np
from scipy import stats
import time
# Generate some random two-dimensional data:
def measure(n):
"Measurement model, return two coupled measurements."
m1 = np.random.normal(size=n)
m2 = np.random.normal(scale=0.5, size=n)
return m1+m2, m1-m2
# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()
# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)
# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5
# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik
# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik
# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik
# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik
The output looks something like this:
iso: 0.00259208679199
resample: 0.000817060470581
filter/sample: 2.10829401016
integral: 4.2200088501e-05
which clearly means that the filter/sample call is consuming almost all of the time the code uses to run. I have to run this block of code iteratively several thousand times so it can get pretty time consuming.
Is there any way to speed up the filtering/sampling process?
Here's a slightly more realistic MWE
of my actual code with Ophion's multi-threading solution written into it:
import numpy as np
from scipy import stats
from multiprocessing import Pool
def kde_integration(m_list):
m1, m2 = [], []
for item in m_list:
# Color data.
m1.append(item[0])
# Magnitude data.
m2.append(item[1])
# Define limits.
xmin, xmax = min(m1), max(m1)
ymin, ymax = min(m2), max(m2)
# Perform a kernel density estimate on the data:
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)
out_list = []
for point in m_list:
# Compute the point below which to integrate.
iso = kernel((point[0], point[1]))
# Sample KDE distribution
sample = kernel.resample(size=1000)
#Create definition.
def calc_kernel(samp):
return kernel(samp)
#Choose number of cores and split input array.
cores = 4
torun = np.array_split(sample, cores, axis=1)
#Calculate
pool = Pool(processes=cores)
results = pool.map(calc_kernel, torun)
#Reintegrate and calculate results
insample_mp = np.concatenate(results) < iso
# Integrate for all values below iso.
integral = insample_mp.sum() / float(insample_mp.shape[0])
out_list.append(integral)
return out_list
# Generate some random two-dimensional data:
def measure(n):
"Measurement model, return two coupled measurements."
m1 = np.random.normal(size=n)
m2 = np.random.normal(scale=0.5, size=n)
return m1+m2, m1-m2
# Create list to pass.
m_list = []
for i in range(60):
m1, m2 = measure(5)
m_list.append(m1.tolist())
m_list.append(m2.tolist())
# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)
The solution presented by Ophion works great on the original code I presented, but fails with the following error in this version:
Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
I tried moving the calc_kernel
function around since one of the answers in this question Multiprocessing: How to use Pool.map on a function defined in a class? states that "the function that you give to map() must be accessible through an import of your module"; but I still can't get this code to work.
Any help will be very much appreciated.
Implementing Ophion's suggestion to remove the calc_kernel
function and simply using:
results = pool.map(kernel, torun)
works to get rid of the PicklingError
but now I see that if I create an initial m_list
of anything more than around 62-63 items I get this error:
Traceback (most recent call last):
File "~/gauss_kde_temp.py", line 67, in <module>
print 'Integral result: ', kde_integration(m_list)
File "~/gauss_kde_temp.py", line 38, in kde_integration
pool = Pool(processes=cores)
File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
self._result_handler.start()
File "/usr/lib/python2.7/threading.py", line 494, in start
_start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread
Since my actual list in my real implementation of this code can have up to 2000 items, this issue renders the code unusable. Line 38
is this one:
pool = Pool(processes=cores)
so apparently it has something to do with the number of cores I'm using?
This question "Can't start a new thread error" in Python suggests using:
threading.active_count()
to check the number of threads I have going when I get that error. I checked and it always crashes when it reaches 374
threads. How can I code around this issue?
Here's the new question dealing with this last issue: Thread error: can't start new thread
Probably the easiest way to speed this up is to parallelize kernel(sample)
:
Taking this code fragment:
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik
#filter/sample: 1.94065904617
Change this to use multiprocessing
:
from multiprocessing import Pool
tik = time.time()
#Create definition.
def calc_kernel(samp):
return kernel(samp)
#Choose number of cores and split input array.
cores = 4
torun = np.array_split(sample, cores, axis=1)
#Calculate
pool = Pool(processes=cores)
results = pool.map(calc_kernel, torun)
#Reintegrate and calculate results
insample_mp = np.concatenate(results) < iso
print 'multiprocessing filter/sample: ', time.time()-tik
#multiprocessing filter/sample: 0.496874094009
Double check they are returning the same answer:
print np.all(insample==insample_mp)
#True
A 3.9x improvement on 4 cores. Not sure what you are running this on, but after about 6 processors your input array size is not large enough to get considerably gains. For example using 20 processors its only about 5.8x faster.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With