I'd like to share several numpy arrays between different child processes with python's multiprocessing module. I'd like the arrays to be separately lockable, and I'd like the number of arrays to be dynamically determined at runtime. Is this possible?
In this answer, J.F. Sebastian lays out a nice way to use python's numpy arrays in shared memory while multiprocessing. The array is lockable, which is what I want. I would like to do something very similar, except with a variable number of shared arrays. The number of arrays would be determined at runtime. His example code is very clear and does almost exactly what I want, but I'm unclear how to declare a variable number of such arrays without giving each one of them a hard-coded name like shared_arr_1
, shared_arr_2
, et cetera. What's the right way to do this?
Turns out this was easier than I thought! Following J.F. Sebastian's encouragement, here's my crack at an answer:
import time
import ctypes
import logging
import Queue
import multiprocessing as mp
import numpy as np
info = mp.get_logger().info
def main():
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
data_pipeline = Image_Data_Pipeline(
num_data_buffers=5,
buffer_shape=(60, 256, 512))
start = time.clock()
data_pipeline.load_buffers(data_pipeline.num_data_buffers)
end = time.clock()
data_pipeline.close()
print "Elapsed time:", end-start
class Image_Data_Pipeline:
def __init__(self, num_data_buffers, buffer_shape):
"""
Allocate a bunch of 16-bit buffers for image data
"""
self.num_data_buffers = num_data_buffers
self.buffer_shape = buffer_shape
pix_per_buf = np.prod(buffer_shape)
self.data_buffers = [mp.Array(ctypes.c_uint16, pix_per_buf)
for b in range(num_data_buffers)]
self.idle_data_buffers = range(num_data_buffers)
"""
Launch the child processes that make up the pipeline
"""
self.camera = Data_Pipeline_Process(
target=child_process, name='Camera',
data_buffers=self.data_buffers, buffer_shape=buffer_shape)
self.display_prep = Data_Pipeline_Process(
target=child_process, name='Display Prep',
data_buffers=self.data_buffers, buffer_shape=buffer_shape,
input_queue=self.camera.output_queue)
self.file_saving = Data_Pipeline_Process(
target=child_process, name='File Saving',
data_buffers=self.data_buffers, buffer_shape=buffer_shape,
input_queue=self.display_prep.output_queue)
return None
def load_buffers(self, N, timeout=0):
"""
Feed the pipe!
"""
for i in range(N):
self.camera.input_queue.put(self.idle_data_buffers.pop())
"""
Wait for the buffers to idle. Here would be a fine place to
feed them back to the pipeline, too.
"""
while True:
try:
self.idle_data_buffers.append(
self.file_saving.output_queue.get_nowait())
info("Buffer %i idle"%(self.idle_data_buffers[-1]))
except Queue.Empty:
time.sleep(0.01)
if len(self.idle_data_buffers) >= self.num_data_buffers:
break
return None
def close(self):
self.camera.input_queue.put(None)
self.display_prep.input_queue.put(None)
self.file_saving.input_queue.put(None)
self.camera.child.join()
self.display_prep.child.join()
self.file_saving.child.join()
class Data_Pipeline_Process:
def __init__(
self,
target,
name,
data_buffers,
buffer_shape,
input_queue=None,
output_queue=None,
):
if input_queue is None:
self.input_queue = mp.Queue()
else:
self.input_queue = input_queue
if output_queue is None:
self.output_queue = mp.Queue()
else:
self.output_queue = output_queue
self.command_pipe = mp.Pipe() #For later, we'll send instrument commands
self.child = mp.Process(
target=target,
args=(name, data_buffers, buffer_shape,
self.input_queue, self.output_queue, self.command_pipe),
name=name)
self.child.start()
return None
def child_process(
name,
data_buffers,
buffer_shape,
input_queue,
output_queue,
command_pipe):
if name == 'Display Prep':
display_buffer = np.empty(buffer_shape, dtype=np.uint16)
while True:
try:
process_me = input_queue.get_nowait()
except Queue.Empty:
time.sleep(0.01)
continue
if process_me is None:
break #We're done
else:
info("start buffer %i"%(process_me))
with data_buffers[process_me].get_lock():
a = np.frombuffer(data_buffers[process_me].get_obj(),
dtype=np.uint16)
if name == 'Camera':
"""
Fill the buffer with data (eventually, from the
camera, dummy data for now)
"""
a.fill(1)
elif name == 'Display Prep':
"""
Process the 16-bit image into a display-ready
8-bit image. Fow now, just copy the data to a
similar buffer.
"""
display_buffer[:] = a.reshape(buffer_shape)
elif name == 'File Saving':
"""
Save the data to disk.
"""
a.tofile('out.raw')
info("end buffer %i"%(process_me))
output_queue.put(process_me)
return None
if __name__ == '__main__':
main()
Background: This is the skeleton of a data-acquisition pipeline. I want to acquire data at a very high rate, process it for on-screen display, and save it to disk. I don't ever want display rate or disk rate to limit acquisition, which is why I think using separate child processes in individual processing loops is appropriate.
Here's typical output of the dummy program:
C:\code\instrument_control>c:\Python27\python.exe test.py
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[[INFO/Camera] child process calling self.run()
INFO/Display Prep] child process calling self.run()
[INFO/Camera] start buffer 4
[INFO/File Saving] child process calling self.run()
[INFO/Camera] end buffer 4
[INFO/Camera] start buffer 3
[INFO/Camera] end buffer 3
[INFO/Camera] start buffer 2
[INFO/Display Prep] start buffer 4
[INFO/Camera] end buffer 2
[INFO/Camera] start buffer 1
[INFO/Camera] end buffer 1
[INFO/Camera] start buffer 0
[INFO/Camera] end buffer 0
[INFO/Display Prep] end buffer 4
[INFO/Display Prep] start buffer 3
[INFO/File Saving] start buffer 4
[INFO/Display Prep] end buffer 3
[INFO/Display Prep] start buffer 2
[INFO/File Saving] end buffer 4
[INFO/File Saving] start buffer 3
[INFO/MainProcess] Buffer 4 idle
[INFO/Display Prep] end buffer 2
[INFO/Display Prep] start buffer 1
[INFO/File Saving] end buffer 3
[INFO/File Saving] start buffer 2
[INFO/MainProcess] Buffer 3 idle
[INFO/Display Prep] end buffer 1
[INFO/Display Prep] start buffer 0
[INFO/File Saving] end buffer 2
[INFO/File Saving] start buffer 1
[[INFO/MainProcess] Buffer 2 idle
INFO/Display Prep] end buffer 0
[INFO/File Saving] end buffer 1
[INFO/File Saving] start buffer 0
[INFO/MainProcess] Buffer 1 idle
[INFO/File Saving] end buffer 0
[INFO/MainProcess] Buffer 0 idle
[INFO/Camera] process shutting down
[INFO/Camera] process exiting with exitcode 0
[INFO/Display Prep] process shutting down
[INFO/File Saving] process shutting down
[INFO/Display Prep] process exiting with exitcode 0
[INFO/File Saving] process exiting with exitcode 0
Elapsed time: 0.263240348548
[INFO/MainProcess] process shutting down
C:\code\instrument_control>
It seems to do what I want: the data gets processed for display and saved to disk without interfering with the acquisition rate.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With