I am using a sparse tensor array manipulation I built using dictionaries and Counters in Python. I would like to make it possible to use this array manipulation in parallel. The bottom line is that I have ended up having Counters on each node which I would like to add together using MPI.Allreduce (or another nice solution). For instance with Counters one can do this
A = Counter({a:1, b:2, c:3})
B = Counter({b:1, c:2, d:3})
such that
C = A+B = Counter({a:1, b:3, c:5, d:3}).
I would like to do this same operation but with all the relevant nodes,
MPI.Allreduce(send_counter, recv_counter, MPI.SUM)
however, MPI doesn't seem to recognize this operation on dictionaries/Counters, throwing an error expecting a buffer or a list/tuple
. Is my best option a `User-Defined Operation,' or is there a way to get Allreduce to add Counters? Thanks,
EDIT (7/14/15): I have attempted to create a user operation for dictionaries but there have been some discrepancies. I wrote the following
def dict_sum(dict1, dict2, datatype):
for key in dict2:
try:
dict1[key] += dict2[key]
except KeyError:
dict1[key] = dict2[key]
and when I told MPI about the function I did this:
dictSumOp = MPI.Op.Create(dict_sum, commute=True)
and in the code I used it as
the_result = comm.allreduce(mydict, dictSumOp)
However, it threw unsupported operand '+' for type dict
. so I wrote
the_result = comm.allreduce(mydict, op=dictSumOp)
and now it throws dict1[key] += dict2[key]
TypeError: 'NoneType' object has no attribute '__getitem__'
so apparently
it wants to know those things are dictionaries? How do I tell it they do have type dictionary?
Neither MPI nor MPI4py knows anything about Counters in particular, so you need to create your own reduction operation for this to work; this would be the same for any other sort of python object:
#!/usr/bin/env python
from mpi4py import MPI
import collections
def addCounter(counter1, counter2, datatype):
for item in counter2:
counter1[item] += counter2[item]
return counter1
if __name__=="__main__":
comm = MPI.COMM_WORLD
if comm.rank == 0:
myCounter = collections.Counter({'a':1, 'b':2, 'c':3})
else:
myCounter = collections.Counter({'b':1, 'c':2, 'd':3})
counterSumOp = MPI.Op.Create(addCounter, commute=True)
totcounter = comm.allreduce(myCounter, op=counterSumOp)
print comm.rank, totcounter
Here we've taken a function which sums two counter objects and created an MPI operator out of them with MPI.Op.Create; mpi4py will unpickle the objects, run this function to combine these items pairwise, then pickle the partial result and send it off to the next task.
Note too that we're using (lowercase) allreduce, which works on arbitrary python objects, rather than (uppercase) Allreduce, which works on numpy arrays or their moral equivalents (buffers, which map onto the Fortran/C arrays that the MPI API is designed on).
Running gives:
$ mpirun -np 2 python ./counter_reduce.py
0 Counter({'c': 5, 'b': 3, 'd': 3, 'a': 1})
1 Counter({'c': 5, 'b': 3, 'd': 3, 'a': 1})
$ mpirun -np 4 python ./counter_reduce.py
0 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
2 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
1 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
3 Counter({'c': 9, 'd': 9, 'b': 5, 'a': 1})
And with only modest changes works with a generic dictionary:
#!/usr/bin/env python
from mpi4py import MPI
def addCounter(counter1, counter2, datatype):
for item in counter2:
if item in counter1:
counter1[item] += counter2[item]
else:
counter1[item] = counter2[item]
return counter1
if __name__=="__main__":
comm = MPI.COMM_WORLD
if comm.rank == 0:
myDict = {'a':1, 'c':"Hello "}
else:
myDict = {'c':"World!", 'd':3}
counterSumOp = MPI.Op.Create(addCounter, commute=True)
totDict = comm.allreduce(myDict, op=counterSumOp)
print comm.rank, totDict
Running giving
$ mpirun -np 2 python dict_reduce.py
0 {'a': 1, 'c': 'Hello World!', 'd': 3}
1 {'a': 1, 'c': 'Hello World!', 'd': 3}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With