Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mulitprocess Pools with different functions

Most examples of the Multiprocess Worker Pools execute a single function in different processes, f.e.

def foo(args):
   pass

if __name__ == '__main__':
   pool = multiprocessing.Pool(processes=30)
   res=pool.map_async(foo,args)

Is there a way to handle two different and independent functions within the pool? So that you could assign f.e. 15 processes for foo() and 15 processes for bar() or is a pool bounded to a single function? Or du you have to create different processes for different functions manually with

 p = Process(target=foo, args=(whatever,))
 q = Process(target=bar, args=(whatever,))
 q.start()
 p.start()

and forget about the worker pool?

like image 727
dorvak Avatar asked Aug 07 '11 22:08

dorvak


4 Answers

To pass different functions, you can simply call map_async multiple times.

Here is an example to illustrate that,

from multiprocessing import Pool
from time import sleep

def square(x):
    return x * x

def cube(y):
    return y * y * y

pool = Pool(processes=20)

result_squares = pool.map_async(f, range(10))
result_cubes = pool.map_async(g, range(10))

The result will be:

>>> print result_squares.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

>>> print result_cubes.get(timeout=1)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
like image 187
Ocaj Nires Avatar answered Oct 18 '22 13:10

Ocaj Nires


You can use map or some lambda function (edit: actually you can't use a lambda function). You can use a simple map function:

def smap(f, *args):
    return f(*args)

pool = multiprocessing.Pool(processes=30)
res=pool.map(smap, function_list, args_list1, args_list2,...)

The normal map function takes iterables as inputs, which is inconvenient.

like image 30
Rayamon Avatar answered Oct 18 '22 13:10

Rayamon


They will not run in parallel. See following code:

def updater1(q,i):    
    print "UPDATER 1:", i
    return

def updater2(q,i):    
    print "UPDATER2:", i
    return

if __name__=='__main__':
    a = range(10)
    b=["abc","def","ghi","jkl","mno","pqr","vas","dqfq","grea","qfwqa","qwfsa","qdqs"]


    pool = multiprocessing.Pool()

    func1 = partial(updater1,q)
    func2 = partial(updater2,q)
    pool.map_async(func1, a)
    pool.map_async(func2, b)

    pool.close()
    pool.join()

The above code yields the following printout:

UPDATER 1: 1
UPDATER 1: 0
UPDATER 1: 2
UPDATER 1: 3
UPDATER 1: 4
UPDATER 1: 5
UPDATER 1: 6
UPDATER 1: 7
UPDATER 1: 8
UPDATER 1: 9
UPDATER2: abc
UPDATER2: def
UPDATER2: ghi
UPDATER2: jkl
UPDATER2: mno
UPDATER2: pqr
UPDATER2: vas
UPDATER2: dqfq
UPDATER2: grea
UPDATER2: qfwqa
UPDATER2: qwfsa
UPDATER2: qdqs
like image 40
Sam Avatar answered Oct 18 '22 13:10

Sam


Here is a working example of the idea shared by @Rayamon:

import functools

from multiprocessing import Pool


def a(param1, param2, param3):
    return param1 + param2 + param3


def b(param1, param2):
    return param1 + param2


def smap(f):
    return f()


func1 = functools.partial(a, 1, 2, 3)
func2 = functools.partial(b, 1, 2)

pool = Pool(processes=2)
res = pool.map(smap, [func1, func2])
pool.close()
pool.join()
print(res)
like image 25
ARA1307 Avatar answered Oct 18 '22 12:10

ARA1307