I am using Python 2.7.5 on a recent vintage Apple MacBook Pro which has four hardware and eight logical CPUs; i.e., the sysctl utility gives:
$ sysctl hw.physicalcpu
hw.physicalcpu: 4
$ sysctl hw.logicalcpu
hw.logicalcpu: 8
I need to perform some rather complicated processing on a large 1-D list or array, and then save the result as an intermediate output which will be used again at a later point in a subsequent calculation within my application. The structure of my problem lends itself rather naturally to parallelization, so I thought that I would try to use Python's multiprocessing module to subdivide the 1D array into several pieces (either 4 pieces or 8 pieces, I'm not yet sure which), perform the calculations in parallel, and then reassemble the resulting output into its final format afterwards. I am trying to decide whether to use multiprocessing.Queue()
(message queues) or multiprocessing.Array()
(shared memory) as my preferred mechanism for communicating the resulting calculations from the child processes back up to the main parent process, and I have been experimenting with a couple of "toy" models in order to make sure that I understand how the multiprocessing module actually works. I've come across a rather unexpected result, however: in creating two essentially equivalent solutions to the same problem, the version which uses shared memory for interprocess communication seems to require much more execution time (like 30X more!) than the version using message queues. Below, I've included two different versions of sample source code for a "toy" problem which generates a long sequence of random numbers using parallel processes, and communicates the agglomerated result back to a parent process in two different ways: first using message queues, and the second time using shared memory.
Here is the version that uses message queues:
import random
import multiprocessing
import datetime
def genRandom(count, id, q):
print("Now starting process {0}".format(id))
output = []
# Generate a list of random numbers, of length "count"
for i in xrange(count):
output.append(random.random())
# Write the output to a queue, to be read by the calling process
q.put(output)
if __name__ == "__main__":
# Number of random numbers to be generated by each process
size = 1000000
# Number of processes to create -- the total size of all of the random
# numbers generated will ultimately be (procs * size)
procs = 4
# Create a list of jobs and queues
jobs = []
outqs = []
for i in xrange(0, procs):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=genRandom, args=(size, i, q))
jobs.append(p)
outqs.append(q)
# Start time of the parallel processing and communications section
tstart = datetime.datetime.now()
# Start the processes (i.e. calculate the random number lists)
for j in jobs:
j.start()
# Read out the data from the queues
data = []
for q in outqs:
data.extend(q.get())
# Ensure all of the processes have finished
for j in jobs:
j.join()
# End time of the parallel processing and communications section
tstop = datetime.datetime.now()
tdelta = datetime.timedelta.total_seconds(tstop - tstart)
msg = "{0} random numbers generated in {1} seconds"
print(msg.format(len(data), tdelta))
When I run it, I get a result that typically looks about like this:
$ python multiproc_queue.py
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 0.514805 seconds
Now, here is the equivalent code segment, but refactored just slightly so that it uses shared memory instead of queues:
import random
import multiprocessing
import datetime
def genRandom(count, id, d):
print("Now starting process {0}".format(id))
# Generate a list of random numbers, of length "count", and write them
# directly to a segment of an array in shared memory
for i in xrange(count*id, count*(id+1)):
d[i] = random.random()
if __name__ == "__main__":
# Number of random numbers to be generated by each process
size = 1000000
# Number of processes to create -- the total size of all of the random
# numbers generated will ultimately be (procs * size)
procs = 4
# Create a list of jobs and a block of shared memory
jobs = []
data = multiprocessing.Array('d', size*procs)
for i in xrange(0, procs):
p = multiprocessing.Process(target=genRandom, args=(size, i, data))
jobs.append(p)
# Start time of the parallel processing and communications section
tstart = datetime.datetime.now()
# Start the processes (i.e. calculate the random number lists)
for j in jobs:
j.start()
# Ensure all of the processes have finished
for j in jobs:
j.join()
# End time of the parallel processing and communications section
tstop = datetime.datetime.now()
tdelta = datetime.timedelta.total_seconds(tstop - tstart)
msg = "{0} random numbers generated in {1} seconds"
print(msg.format(len(data), tdelta))
When I run the shared memory version, however, the typical result looks more like this:
$ python multiproc_shmem.py
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 15.839607 seconds
My question: why is there such a huge difference in execution speeds (roughly 0.5 seconds vs. 15 seconds, a factor of 30X!) between the two versions of my code? And in particular, how can I modify the shared memory version in order to get it to run faster?
Shared memory can be deemed as faster (low overhead, high volume of data passing) then queues.
The disadvantage is that passing large amounts of data would be probably slow. In that case, you need to use need shared memory. For passing state (i.e. worker thread reporting progress to the GUI) the messages are the way to go.
One of the models of process communication is the shared memory model. The shared memory in the shared memory model is the memory that can be simultaneously accessed by multiple processes. This is done so that the processes can communicate with each other.
shared memory IPC implementations? Message passing: Pros: easy to manage, useful for small amounts of shared information, no conflicts have to be considered. Cons: must send and receive via system calls (interrupts costly), doesn't scale for larger shared data requirements.
This is because multiprocessing.Array
uses a lock by default to prevent multiple processes from accessing it at once:
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
...
If lock is True (the default) then a new lock object is created to synchronize access to the value. If lock is a Lock or RLock object then that will be used synchronize access to the value. If lock is False then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.
This means you're not really concurrently writing to the array - only one process can access it at a time. Since your example workers are doing almost nothing but array writes, constantly waiting on this lock badly hurts performance. If you use lock=False
when you create the array, the performance is much better:
With lock=True
:
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 4.811205 seconds
With lock=False
:
Now starting process 0
Now starting process 3
Now starting process 1
Now starting process 2
4000000 random numbers generated in 0.192473 seconds
Note that using lock=False
means you need to manually protect access to the Array
whenever you're doing something that isn't process-safe. Your example is having processes write to unique parts, so it's ok. But if you were trying to read from it while doing that, or had different processes write to overlapping parts, you would need to manually acquire a lock.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With