Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: Yield in multiprocessing Pool

I've to Parallelize a function which involves a certain "yield". This is only a simple replica of the whole program that I've to work on, but sums up the problems i'm facing. Here I'm try to understand multiprocessing, apply_async and yield for my project In this example I've used a multiprocessing.pool and have used the apply_async to parallelize. I've put some print statements in the "parallel" function, but they aren't getting printed. When i replace yield with return the the print statements are getting reflected. I'm not certain about the nature of yield. I know its a generator and can be used only once after its returned. Please advise on how to get this working.

import multiprocessing as mp
results=[]

def parallel(x, y, z):
    print "aim in parallel"
    count=0
    result=[]
    for line in range(10000):
        count+=1
    result.append(count)
    p=x**3+y+z
    print " result"
    print result
    print p
    if p > 0:
       return result
#      yield result, p
#      count += 1
#      yield p, result
#      count += 1

def collect_results(result):
   print "aim in callback"
   results.append(result)
   #print results


def apply_async_with_callback():
    pool    = mp.Pool(processes=10)
    r = range(10)
    [pool.apply_async(parallel, args=(2,5, 7),callback=collect_results) for i in r ]
    pool.close()
    pool.join()
    print "length"
    print len(results)
    print results

if __name__ == "__main__":
    apply_async_with_callback()
like image 320
Shiva Chetan Avatar asked Jun 03 '15 08:06

Shiva Chetan


1 Answers

When a function containing a yield statement is called, it doesn't actually run the code but returns a generator instead:

>>> p = parallel(1, 2, 3)
>>> p
<generator object parallel at 0x7fde9c1daf00>

Then, when the next value is required, the code will run until a value is yielded:

>>> next(p)
([10000], 6)
>>> next(p)
(6, [10000])

In your case, results contains 10 generators that have been created asynchronously, but they've never been actually run.

If you want to use a generator, you could change your code a bit to target a function that creates a list from the generator:

def parallel2(x, y, z):
    return list(parallel(x, y, z))

def collect_results(lst):
   results.extend(lst)

def apply_async_with_callback():
    pool = mp.Pool()
    for _ in range(10):
        pool.apply_async(parallel2, args=(2, 5, 7),
                         callback=collect_results)
like image 106
Vincent Avatar answered Sep 21 '22 16:09

Vincent