Suppose I have the following code:
from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
def func(x,y):
return y/x
def main(y, xmin,xmax, dx):
x = arange(xmin,xmax,dx)
output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
return x, asarray(output)
def demo():
x,z = main(2.,1.,30.,.1)
plt.plot(x,z, label='All values')
plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
plt.show()
demo()
I want to calculate output only until output > a given number (it can be assumed that elements of output decreases monotonically with increase of x) and then stop (NOT calculating for all values of x and then sorting, that's inefficient for my purpose). Is there any way to do that using Parallel, delayed or any other multiprocessing?
The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax. Under Windows, the use of multiprocessing. Pool requires to protect the main loop of code to avoid recursive spawning of subprocesses when using joblib.
joblib provides a method named cpu_count() which returns a number of cores on a computer. It'll then create a parallel pool with that many processes available for processing in parallel. We then call this object by passing it a list of delayed functions created above.
Joblib is a set of tools to provide lightweight pipelining in Python. In particular: transparent disk-caching of functions and lazy re-evaluation (memoize pattern)
There was no output > a given number
specified so I just made one up. after testing I had to reverse the
condition for proper operation output < a given number
.
I would use a pool, launch the processes with a callback function to check the stop condition, then terminate the pool when ready. but that would cause a race condition which would allow results to be omitted from running processes that were not allowed to finish. I think this method has minimal modification to your code and is very easy to read. The order of list is NOT guaranteed.
Pros: very little overhead
Cons: could have missing results.
Method 1)
from scipy import *
import multiprocessing
import matplotlib.pyplot as plt
def stop_condition_callback(ret):
output.append(ret)
if ret < stop_condition:
worker_pool.terminate()
def func(x, y, ):
return y / x
def main(y, xmin, xmax, dx):
x = arange(xmin, xmax, dx)
print("Number of calculations: %d" % (len(x)))
# add calculations to the pool
for i in x:
worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)
# wait for the pool to finish/terminate
worker_pool.close()
worker_pool.join()
print("Number of results: %d" % (len(output)))
return x, asarray(output)
def demo():
x, z_list = main(2., 1., 30., .1)
plt.plot(z_list, label='desired range')
plt.show()
output = []
stop_condition = 0.1
worker_pool = multiprocessing.Pool()
demo()
This method has more overhead but will allow processes which have started to finish. Method 2)
from scipy import *
import multiprocessing
import matplotlib.pyplot as plt
def stop_condition_callback(ret):
if ret is not None:
if ret < stop_condition:
worker_stop.value = 1
else:
output.append(ret)
def func(x, y, ):
if worker_stop.value != 0:
return None
return y / x
def main(y, xmin, xmax, dx):
x = arange(xmin, xmax, dx)
print("Number of calculations: %d" % (len(x)))
# add calculations to the pool
for i in x:
worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)
# wait for the pool to finish/terminate
worker_pool.close()
worker_pool.join()
print("Number of results: %d" % (len(output)))
return x, asarray(output)
def demo():
x, z_list = main(2., 1., 30., .1)
plt.plot(z_list, label='desired range')
plt.show()
output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1
worker_pool = multiprocessing.Pool()
demo()
Method 3) Pros: No results will be left out
Cons: This steps way outside what you would normally do.
take Method 1 and add
def stopPoolButLetRunningTaskFinish(pool):
# Pool() shutdown new task from being started, by emptying the query all worker processes draw from
while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
pool._inqueue._reader.recv()
# Send sentinels to all worker processes
for a in range(len(pool._pool)):
pool._inqueue.put(None)
Then change stop_condition_callback
def stop_condition_callback(ret):
if ret[1] < stop_condition:
#worker_pool.terminate()
stopPoolButLetRunningTaskFinish(worker_pool)
else:
output.append(ret)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With