Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement parallel, delayed in such a way that the parallelized for loop stops when output goes below a threshold?

Suppose I have the following code:

from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt

def func(x,y):
    return y/x
def main(y, xmin,xmax, dx):
    x = arange(xmin,xmax,dx)
    output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
    return x, asarray(output)
def demo():
    x,z = main(2.,1.,30.,.1)
    plt.plot(x,z, label='All values')
    plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
    plt.show()

demo()

I want to calculate output only until output > a given number (it can be assumed that elements of output decreases monotonically with increase of x) and then stop (NOT calculating for all values of x and then sorting, that's inefficient for my purpose). Is there any way to do that using Parallel, delayed or any other multiprocessing?

like image 528
user247534 Avatar asked Nov 25 '19 06:11

user247534


People also ask

What does joblib delayed do?

The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax. Under Windows, the use of multiprocessing. Pool requires to protect the main loop of code to avoid recursive spawning of subprocesses when using joblib.

How does joblib parallel work?

joblib provides a method named cpu_count() which returns a number of cores on a computer. It'll then create a parallel pool with that many processes available for processing in parallel. We then call this object by passing it a list of delayed functions created above.

What is joblib in Python?

Joblib is a set of tools to provide lightweight pipelining in Python. In particular: transparent disk-caching of functions and lazy re-evaluation (memoize pattern)


1 Answers

There was no output > a given number specified so I just made one up. after testing I had to reverse the condition for proper operation output < a given number.

I would use a pool, launch the processes with a callback function to check the stop condition, then terminate the pool when ready. but that would cause a race condition which would allow results to be omitted from running processes that were not allowed to finish. I think this method has minimal modification to your code and is very easy to read. The order of list is NOT guaranteed.

Pros: very little overhead
Cons: could have missing results.

Method 1)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
        output.append(ret)
        if ret < stop_condition:
            worker_pool.terminate()


def func(x, y, ):
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

This method has more overhead but will allow processes which have started to finish. Method 2)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
    if ret is not None:
        if ret < stop_condition:
            worker_stop.value = 1
        else:
            output.append(ret)


def func(x, y, ):
    if worker_stop.value != 0:
        return None
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Method 3) Pros: No results will be left out
Cons: This steps way outside what you would normally do.

take Method 1 and add

def stopPoolButLetRunningTaskFinish(pool):
    # Pool() shutdown new task from being started, by emptying the query all worker processes draw from
    while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
        pool._inqueue._reader.recv()
    # Send sentinels to all worker processes
    for a in range(len(pool._pool)):
            pool._inqueue.put(None)

Then change stop_condition_callback

def stop_condition_callback(ret):
    if ret[1] < stop_condition:
        #worker_pool.terminate()
        stopPoolButLetRunningTaskFinish(worker_pool)
    else:
        output.append(ret)
like image 163
ron Avatar answered Nov 15 '22 01:11

ron