I'm trying to use IPython's parallel environment and so far, it's looking great but I'm running into a problem. Lets say that I have a function, defined in a library
def func(a,b): ...
that I use when I want to evaluate on one value of a and a bunch of values of b.
[func(myA, b) for b in myLongList]
Obviously, the real function is more complicated but the essence of the matter is that it takes multiple parameters and I'd like to map over only one of them. The problem is that map, @dview.parallel, etc. map over all the arguments.
So lets say I want to get the answer to func(myA, myLongList). The obvious way to do this is to curry, either w/ functools.partial or just as
dview.map_sync(lambda b: func(myA, b), myLongList)
However, this does not work correctly on remote machines. The reason is that when the lambda expression is pickled, the value of myA is not included and instead, the value of myA from the local scope on the remote machine is used. When closures get pickled, the variables they close over don't.
Two ways I can think of doing this that will actually work are to manually construct lists for every argument and have map work over all of the arguments,
dview.map_sync(func, [myA]*len(myLongList), myLongList)
or to horrifically use the data as default arguments to a function, forcing it to get pickled:
# Can't use a lambda here b/c lambdas don't use default arguments :( def parallelFunc(b, myA = myA): return func(myA, b) dview.map_sync(parallelFunc, myLongList)
Really, this all seems horribly contorted when the real function takes a lot of parameters and is more complicated. Is there some idiomatic way of doing this? Something like
@parallel(mapOver='b') def bigLongFn(a, b): ...
but as far as I know, nothing like the 'mapOver' thing exists. I probably have an idea of how to implement it ... this just feels like a very basic operation that there should exist support for so I want to check if I'm missing something.
Passing Multiple Arguments to map() function Suppose we pass n iterable to map(), then the given function should have n number of arguments. These iterable arguments must be applied on given function in parallel. In multiple iterable arguments, when shortest iterable is drained, the map iterator will stop.
Interactive progress across parallel engines. Those progress bars are interactive widgets running locally, and on each remote engine!
The map function has two arguments (1) a function, and (2) an iterable. Applies the function to each element of the iterable and returns a map object. The function can be (1) a normal function, (2) an anonymous function, or (3) a built-in function.
I can improve a bit on batu's answer (which I think is a good one, but doesn't perhaps document in as much detail WHY you use those options). The ipython documentation is also currently woefully inadequate on this point. So your function is of the form:
def myfxn(a,b,c,d): .... return z
and stored in a file called mylib. Lets say b,c, and d are the same during your run, so you write a lambda function to reduce it to a 1-parameter function.
import mylib mylamfxn=lambda a:mylib.myfxn(a,b,c,d)
and you want to run:
z=dview.map_sync(mylamfxn, iterable_of_a)
In a dream world, everything would magically work like that. However, first you'd get an error of "mylib not found," because the ipcluster processes haven't loaded mylib. Make sure the ipcluster processes have "mylib" in their python path and are in the correct working directory for myfxn, if necessary. Then you need to add to your python code:
dview.execute('import mylib')
which runs the import mylib
command on each process. If you try again, you'll get an error along the lines of "global variable b not defined" because while the variables are in your python session, they aren't in the ipcluster processes. However, python provides a method of copying a group of variables to the subprocesses. Continuing the example above:
mydict=dict(b=b, c=c, d=d) dview.push(mydict)
Now all of the subprocesses have access to b,c,and d. Then you can just run:
z=dview.map_sync(mylamfxn, iterable_of_a)
and it should now work as advertised. Anyway, I'm new to parallel computing with python, and found this thread useful, so I thought I'd try to help explain a few of the points that confused me a bit....
The final code would be:
import mylib #set up parallel processes, start ipcluster from command line prior! from IPython.parallel import Client rc=Client() dview=rc[:] #...do stuff to get iterable_of_a and b,c,d.... mylamfxn=lambda a:mylib.myfxn(a,b,c,d) dview.execute('import mylib') mydict=dict(b=b, c=c, d=d) dview.push(mydict) z=dview.map_sync(mylamfxn, iterable_of_a)
This is probably the quickest and easiest way to make pretty much any embarrassingly parallel code run parallel in python....
UPDATE You can also use dview to push all the data without loops and then use an lview (i.e. lview=rc.load_balanced_view(); lview.map(...)
to do the actual calculation in load balanced fashion.
This is my first message to StackOverflow so please be gentle ;) I was trying to do the same thing, and came up with the following. I am pretty sure this is not the most efficient way, but seems to work somewhat. One caveat for now is that for some reason I only see two engines working at 100%, the others are sitting almost idle...
In order to call a multiple arg function in map I first wrote this routine in my personal parallel.py module:
def map(r,func, args=None, modules=None): """ Before you run parallel.map, start your cluster (e.g. ipcluster start -n 4) map(r,func, args=None, modules=None): args=dict(arg0=arg0,...) modules='numpy, scipy' examples: func= lambda x: numpy.random.rand()**2. z=parallel.map(r_[0:1000], func, modules='numpy, numpy.random') plot(z) A=ones((1000,1000)); l=range(0,1000) func=lambda x : A[x,l]**2. z=parallel.map(r_[0:1000], func, dict(A=A, l=l)) z=array(z) """ from IPython.parallel import Client mec = Client() mec.clear() lview=mec.load_balanced_view() for k in mec.ids: mec[k].activate() if args is not None: mec[k].push(args) if modules is not None: mec[k].execute('import '+modules) z=lview.map(func, r) out=z.get() return out
As you can see the function takes an args parameter which is a dict of parameters in the head nodes workspace. These parameters are then pushed to the engines. At that point they become local objects and can be used in the function directly. For example in the last example given above in comments, the A matrix is sliced using the l engine-local variable.
I must say that even though the above function works, I am not 100% happy with it at the moment. If I can come up with something better, I will post it here.
UPDATE:2013/04/11 I made minor changes to the code: - The activate statement was missing brackets. Causing it not to run. - Moved mec.clear() to the top of the function, as opposed to the end. I also noticed that it works best if I run it within ipython. For example, I may get errors if I run a script using the above function as "python ./myparallelrun.py" but not if I run it within ipython using "%run ./myparallelrun.py". Not sure why...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With