Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine Pool.map with shared memory Array in Python multiprocessing

I have a very large (read only) array of data that I want to be processed by multiple processes in parallel.

I like the Pool.map function and would like to use it to calculate functions on that data in parallel.

I saw that one can use the Value or Array class to use shared memory data between processes. But when I try to use this I get a RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance when using the Pool.map function:

Here is a simplified example of what I am trying to do:

from sys import stdin from multiprocessing import Pool, Array  def count_it( arr, key ):   count = 0   for c in arr:     if c == key:       count += 1   return count  if __name__ == '__main__':   testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"   # want to share it using shared memory   toShare = Array('c', testData)    # this works   print count_it( toShare, "a" )    pool = Pool()    # RuntimeError here   print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] ) 

Can anyone tell me what I am doing wrong here?

So what I would like to do is pass info about a newly created shared memory allocated array to the processes after they have been created in the process pool.

like image 469
Jeroen Dirks Avatar asked Nov 04 '09 18:11

Jeroen Dirks


People also ask

Does Python multiprocessing use shared memory?

Python 3.8 introduced a new module multiprocessing. shared_memory that provides shared memory for direct access across processes. My test shows that it significantly reduces the memory usage, which also speeds up the program by reducing the costs of copying and moving things around.

How do processes pools work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.

What is multiprocess Python?

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.


1 Answers

Trying again as I just saw the bounty ;)

Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:).

from sys import stdin from multiprocessing import Pool, Array, Process  def count_it( key ):   count = 0   for c in toShare:     if c == key:       count += 1   return count  if __name__ == '__main__':   # allocate shared array - want lock=False in this case since we    # aren't writing to it and want to allow multiple processes to access   # at the same time - I think with lock=True there would be little or    # no speedup   maxLength = 50   toShare = Array('c', maxLength, lock=False)    # fork   pool = Pool()    # can set data after fork   testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"   if len(testData) > maxLength:       raise ValueError, "Shared array too small to hold data"   toShare[:len(testData)] = testData    print pool.map( count_it, ["a", "b", "s", "d"] ) 

[EDIT: The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:

from sys import stdin from multiprocessing import Pool, Array, Process import mymodule  def count_it( key ):   count = 0   for c in mymodule.toShare:     if c == key:       count += 1   return count  def initProcess(share):   mymodule.toShare = share  if __name__ == '__main__':   # allocate shared array - want lock=False in this case since we    # aren't writing to it and want to allow multiple processes to access   # at the same time - I think with lock=True there would be little or    # no speedup   maxLength = 50   toShare = Array('c', maxLength, lock=False)    # fork   pool = Pool(initializer=initProcess,initargs=(toShare,))    # can set data after fork   testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"   if len(testData) > maxLength:       raise ValueError, "Shared array too small to hold data"   toShare[:len(testData)] = testData    print pool.map( count_it, ["a", "b", "s", "d"] ) 

Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.

like image 169
robince Avatar answered Oct 02 '22 18:10

robince