I have a very large (read only) array of data that I want to be processed by multiple processes in parallel.
I like the Pool.map
function and would like to use it to calculate functions on that data in parallel.
I saw that one can use the Value
or Array
class to use shared memory data between processes. But when I try to use this I get a RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance
when using the Pool.map function:
Here is a simplified example of what I am trying to do:
from sys import stdin from multiprocessing import Pool, Array def count_it( arr, key ): count = 0 for c in arr: if c == key: count += 1 return count if __name__ == '__main__': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" # want to share it using shared memory toShare = Array('c', testData) # this works print count_it( toShare, "a" ) pool = Pool() # RuntimeError here print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
Can anyone tell me what I am doing wrong here?
So what I would like to do is pass info about a newly created shared memory allocated array to the processes after they have been created in the process pool.
Python 3.8 introduced a new module multiprocessing. shared_memory that provides shared memory for direct access across processes. My test shows that it significantly reduces the memory usage, which also speeds up the program by reducing the costs of copying and moving things around.
Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
Trying again as I just saw the bounty ;)
Basically I think the error message means what it said - multiprocessing shared memory Arrays can't be passed as arguments (by pickling). It doesn't make sense to serialise the data - the point is the data is shared memory. So you have to make the shared array global. I think it's neater to put it as the attribute of a module, as in my first answer, but just leaving it as a global variable in your example also works well. Taking on board your point of not wanting to set the data before the fork, here is a modified example. If you wanted to have more than one possible shared array (and that's why you wanted to pass toShare as an argument) you could similarly make a global list of shared arrays, and just pass the index to count_it (which would become for c in toShare[i]:
).
from sys import stdin from multiprocessing import Pool, Array, Process def count_it( key ): count = 0 for c in toShare: if c == key: count += 1 return count if __name__ == '__main__': # allocate shared array - want lock=False in this case since we # aren't writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array('c', maxLength, lock=False) # fork pool = Pool() # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] )
[EDIT: The above doesn't work on windows because of not using fork. However, the below does work on Windows, still using Pool, so I think this is the closest to what you want:
from sys import stdin from multiprocessing import Pool, Array, Process import mymodule def count_it( key ): count = 0 for c in mymodule.toShare: if c == key: count += 1 return count def initProcess(share): mymodule.toShare = share if __name__ == '__main__': # allocate shared array - want lock=False in this case since we # aren't writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array('c', maxLength, lock=False) # fork pool = Pool(initializer=initProcess,initargs=(toShare,)) # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] )
Not sure why map won't Pickle the array but Process and Pool will - I think perhaps it has be transferred at the point of the subprocess initialization on windows. Note that the data is still set after the fork though.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With