I am attempting to parallelize an algorithm that I have been working on using the Multiprocessing and Pool.map() commands. I ran into a problem and was hoping someone could point me in the right direction.
Let x denote an array of N rows and 1 column, which is initialized to be a vector of zeros. Let C denote an array of length N by 2. The vector x is constructed iteratively by using information from some subsets of C (doing some math operations). The code (not parallelized) as a large for loop looks roughly as follows:
for j in range(0,N)
#indx_j will have n_j <<N entries
indx_j = build_indices(C,j)
#x_j will be entries to be added to vector x at indices indx_j
#This part is time consuming
x_j = build_x_j(indx_j,C)
#Add x_j into entries of x
x[indx_j] = x[indx_j] + x_j
I was able to parallelize this using the multiprocessing module and using the pool.map to eliminate the large for loop. I wrote a function that did the above computations, except the step of adding x_j to x[indx_j]. The parallelized function instead returns two data sets back: x_j and indx_j. After those are computed, I run a for loop (not parallel) to build up x by doing the x[indx_j] = x[indx_j] +x_j computation for j=0,N.
The downside to my method is that pool.map operation returns a gigantic list of N pairs of arrays x_j and indx_j. where both x_j and indx_j were n_j by 1 vectors (n_j << N). For large N (N >20,000) this was taking up way too much memory. Here is my question: Can I somehow, in parallel, do the construction operation x[indx_j] = x[indx_j] + x_j. It seems to me each process in pool.map() would have to be able to interact with the vector x. Do I place x in some sort of shared memory? How would I do such a thing? I suspect that this has to be possible somehow, as I assume people assemble matrices in parallel for finite element methods all the time. How can I have multiple processes interact with a vector without having some sort of problem? I'm worried that perhaps for j= 20 and j = 23, if they happen simultaneously, they might try to add to x[indx_20] = x[indx_20] + x_20 and simultaneously x[indx_30] = x[indx_30] + x_30 and maybe some error will happen. I also don't know how to even have this computation done via the pool.map() (I don't think I can feed x in as an input, as it would be changing after each process).
I'm not sure if it matters or not, but the sets indx_j will have non-trivial intersection (e.g., indx_1 and indx_2 may have indices [1,2,3] and [3,4,5] for example).
If this is unclear, please let me know and I will attempt to clarify. This is my first time trying to work in parallel, so I am very unsure of how to proceed. Any information would be greatly appreciated. Thanks!
I dont know If I am qualified to give proper advice on the topic of shared memory arrays, but I had a similar need to share arrays across processes in python recently and came across a small custom numpy.ndarray
implementation for a shared memory array in numpy using the shared ctypes within multiprocessing
. Here is a link to the code: shmarray.py. It acts just like a normal array,except the underlying data is stored in shared memory, meaning separate processes can both read and write to the same array.
Using Shared Memory Array
In threading, all information available to the thread (global and local namespace) can be handled as shared between all other threads that have access to it, but in multiprocessing that data is not so easily accessible. On linux data is available for reading, but cannot be written to. Instead when a write is done, the data is copied and then written to, meaning no other process can see those changes. However, if the memory being written to is shared memory, it is not copied. This means with shmarray we can do things similar to the way we would do threading, with the true parallelism of multiprocessing. One way to have access to the shared memory array is with a subclass. I know you are currently using Pool.map(), but I had felt limited by the way map worked, especially when dealing with n-dimensional arrays. Pool.map() is not really designed to work with numpy styled interfaces, at least I don't think it can easily. Here is a simple idea where you would spawn a process for each j
in N
:
import numpy as np
import shmarray
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, j, C, x):
multiprocessing.Process.__init__()
self.shared_x = x
self.C = C
self.j = j
def run(self):
#Your Stuff
#indx_j will have n_j <<N entries
indx_j = build_indices(self.C,self.j)
#x_j will be entries to be added to vector x at indices indx_j
x_j = build_x_j(indx_j,self.C)
#Add x_j into entries of x
self.shared_x[indx_j] = self.shared_x[indx_j] + x_j
#And then actually do the work
N = #What ever N should be
x = shmarray.zeros(shape=(N,1))
C = #What ever C is, doesn't need to be shared mem, since no writing is happening
procs = []
for j in range(N):
proc = Worker(j, C, x)
procs.append(proc)
proc.start()
#And then join() the processes with the main process
for proc in procs:
proc.join()
Custom Process Pool and Queues
So this might work, but spawning several thousand processes is not really going to be of any use if you only have a few cores. The way I handled this was to implement a Queue
system between my process. That is to say, we have a Queue
that the main process fills with j
's and then a couple worker processes get numbers from the Queue
and do work with it, note that by implementing this, you are essentially doing exactly what Pool
does. Also note we are actually going to use multiprocessing.JoinableQueue
for this since it lets use join() to wait till a queue is emptied.
Its not hard to implement this at all really, simply we must modify our Subclass a bit and how we use it. import numpy as np import shmarray import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, C, x, job_queue):
multiprocessing.Process.__init__()
self.shared_x = x
self.C = C
self.job_queue = job_queue
def run(self):
#New Queue Stuff
j = None
while j!='kill': #this is how I kill processes with queues, there might be a cleaner way.
j = self.job_queue.get() #gets a job from the queue if there is one, otherwise blocks.
if j!='kill':
#Your Stuff
indx_j = build_indices(self.C,j)
x_j = build_x_j(indx_j,self.C)
self.shared_x[indx_j] = self.shared_x[indx_j] + x_j
#This tells the queue that the job that was pulled from it
#Has been completed (we need this for queue.join())
self.job_queue.task_done()
#The way we interact has changed, now we need to define a job queue
job_queue = multiprocessing.JoinableQueue()
N = #What ever N should be
x = shmarray.zeros(shape=(N,1))
C = #What ever C is, doesn't need to be shared mem, since no writing is happening
procs = []
proc_count = multiprocessing.cpu_count() # create as many procs as cores
for _ in range(proc_count):
proc = Worker(C, x, job_queue) #now we pass the job queue instead
procs.append(proc)
proc.start()
#all the workers are just waiting for jobs now.
for j in range(N):
job_queue.put(j)
job_queue.join() #this blocks the main process until the queue has been emptied
#Now if you want to kill all the processes, just send a 'kill'
#job for each process.
for proc in procs:
job_queue.put('kill')
job_queue.join()
Finally, I really cannot say anything about how this will handle writing to overlapping indices at the same time. Worst case is that you could have a serious problem if two things attempt to write at the same time and things get corrupted/crash(I am no expert here so I really have no idea if that would happen). Best case since you are just doing addition, and order of operations doesn't matter, everything runs smoothly. If it doesn't run smoothly, my suggestion is to create a second custom Process
subclass that specifically does the array assignment. To implement this you would need to pass both a job queue, and an 'output' queue to the Worker subclass. Within the while loop, you should have a `output_queue.put((indx_j, x_j)). NOTE: If you are putting these into a Queue they are being pickled, which can be slow. I recommend making them shared memory arrays if they can be before using put. It may be faster to just pickle them in some cases, but I have not tested this. To assign these as they are generated, you then need to have your Assigner process read these values from a queue as jobs and apply them, such that the work loop would essentially be:
def run(self):
job = None
while job!='kill':
job = self.job_queue.get()
if job!='kill':
indx_j, x_j = job
#Note this is the process which really needs access to the X array.
self.x[indx_j] += x_j
self.job_queue.task_done()
This last solution will likely be slower than doing the assignment within the worker threads, but if you are doing it this way, you have no worries about race conditions, and memory is still lighter since you can use up the indx_j
and x_j
values as you generate them, instead of waiting till all of them are done.
Note for Windows
I didn't do any of this work on windows, so I am not 100% certain, but I believe the code above will be very memory intensive since windows does not implement a copy-on-write system for spawning independent processes. Essentially windows will copy ALL information that a process needs when spawning a new one from the main process. To fix this, I think replacing all your x_j
and C
with shared memory arrays (anything you will be handing around to other processes) instead of normal arrays should cause windows to not copy the data, but I am not certain. You did not specify which platform you were on so I figured better safe than sorry, since multiprocessing is a different beast on windows than linux.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With