Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Altering different python objects in parallel processes, respectively

In a Nutshell

I want to change complex python objects concurrently, whereby each object is processed by a single process only. How can I do this (most efficiently)? Would implementing some kind of pickling support help? Would that be efficient?

Full Problem

I have a python data structure ArrayDict that basically consists of a numpy array and a dictionary and maps arbitrary indices to rows in the array. In my case, all keys are integers.

a = ArrayDict()

a[1234] = 12.5
a[10] = 3

print(a[1234])                               #12.5
print(a[10])                                 # 3.0

print(a[1234] == a.array[a.indexDict[1234]]) #true

Now I have multiple such ArrayDicts and want to fill them in myMethod(arrayDict, params). Since myMethod is expensive, I want to run it in parallel. Note that myMethod may add many rows to arrayDict. Every process alters its own ArrayDict. I do not need concurrent access to the ArrayDicts.

In myMethod, I change entries in the arrayDict (that is, I change the internal numpy array), I add entries to the arrayDict (that is, I add another index to the dictionary and write a new value in the internal array). Eventually, I would like to be able to exchange arrayDict's internal numpy array when it becomes too small. This does not happen often and I could perform this action in the non-parallel part of my program, if no better solution exists. My own attempts were not successful even without the array exchange.

I have spent days researching on shared memory and python's multiprocessing module. Since I will finally be working on linux, the task seemed to be rather simple: the system call fork() allows to work with copies of the arguments efficiently. My thought was then to change each ArrayDict in its own process, return the changed version of the object, and overwrite the original object. To save memory and save the work for copying, I used in addition sharedmem arrays to store the data in ArrayDict. I am aware that the dictionary must still be copied.

from sharedmem import sharedmem
import numpy as np

n = ...                   # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False

while not done:
    consideredData = ...  # numpy boolean array of length
                          # n with True at the index of
                          # considered data
    args = ...            # numpy array containing arguments
                          # for myMethod

    with sharedmem.MapReduce() as pool:
        results = pool.map(myMethod, 
                           list(zip(myData[considered], 
                                    args[considered])),
                           star=True)
        myData[considered] = results

    done = ...             # depends on what happens in
                           # myMethod

What I get is a segmentation fault error. I was able to circumvent this error by creating deepcopies of the ArrayDicts to myMethod and saving them into myData. I do not really understand why this is necessary, and copying my (potentially very large) arrays frequently (the while loop takes long) is not what seems to be efficient to me. However, at least it worked to a certain extent. Nevertheless, my program has some buggy behaviour at the 3rd iteration due to the shared memory. Therefore, I think that my way is not optimal.

I read here and here that it is possible to save aribtrary numpy arrays on the shared memory using multiprocessing.Array. However, I would still need to share the whole ArrayDict, which includes in particular a dictionary, which in turn is not pickable.

How could I achieve my goals in an efficient way? Would it be possible (and efficient) to make my object pickable somehow?

All solutions must run with python 3 and full numpy/scipy support on 64bit Linux.

Edit

I found here that it is somehow possible to share arbitrary objects using Multiprocessing "Manager" classes and user-defined proxy classes. Will this be efficient? I would like to exploit that I do not need concurrent access to the objects, even though they are not handled in the main process. Would it be an option to create a manager for each object that I want to process? (I might still have some misconceptions about how mangers work.)

like image 219
Samufi Avatar asked Dec 15 '16 12:12

Samufi


1 Answers

This seems like a fairly complex class, and I am not able to completely anticipate if this solution will work in your case. A simple compromise for such a complex class is to use ProcessPoolExecutor.

If this does not answer your question then it would be good with a minimal, working, example.

from concurrent.futures import ProcessPoolExecutor

import numpy as np

class ArrayDict ():
  keys = None
  vals = None

  def __init__ (self):
    self.keys = dict ()
    self.vals = np.random.rand (1000)

  def __str__ (self):
    return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())

def myMethod (ad, args):
  print ("starting:", ad)


if __name__ == '__main__':
  l     = [ArrayDict() for _ in range (5)]
  args  = [2, 3, 4, 1, 3]

  with ProcessPoolExecutor (max_workers = 2) as ex:

    d = ex.map (myMethod, l, args)

The objects are cloned when sent to the child process, you need to return the result (as changes to the object will not propagate back to the main process) and handle how you want to store them.

Note that changes to class variables will propagate to other objects in the same process, e.g. if you have more tasks than processes, changes to class variables will be shared among the instances running in the same process. This is usually undesired behavior.

This is a high-level interface to parallelization. ProcessPoolExecutor uses the multiprocessing module and can only be used with pickable objects. I suspect that ProcessPoolExecutor has performance similar to "sharing state between processes". Under the hood, ProcessPoolExecutor is using multiprocessing.Process, and should exhibit similar performance as Pool (except when using very long iterables with map). ProcessPoolExecutor does seem to be the intended future API for concurrent tasks in python.

If you can, it is usually faster to use the ThreadPoolExecutor (which can just be swapped for the ProcessPoolExecutor). In this case the object is shared between the processes, and an update to one will propagate back to the main thread.

As mentioned the fastest option is probably to re-structure ArrayDict so that it only uses objects that can be represented by multiprocessing.Value or Array.

If ProcessPoolExecutor does not work, and you cannot optimize ArrayDict, you may be stuck with using a Manager. There are good examples on how to do that here.

The greatest performance gain is often likely to be found in myMethod. And, as I mentioned, the overhead of using threads is less than that of processes.

like image 74
gauteh Avatar answered Sep 28 '22 06:09

gauteh