Multiprocessing is a wonderful tool but is not so straight forward to use large memory chunks with it. You can load chunks in each process and dump results on disk but sometimes you need to store the results in the memory. And on top, use the fancy numpy functionality.
I have read/googled a lot and came up with some answers:
Use numpy array in shared memory for multiprocessing
Share Large, Read-Only Numpy Array Between Multiprocessing Processes
Python multiprocessing global numpy arrays
How do I pass large numpy arrays between python subprocesses without saving to disk?
Etc etc etc.
They all have drawbacks: Not-so-mainstream libraries (sharedmem
); globally storing variables; not so easy to read code, pipes, etc etc.
My goal was to seamlessly use numpy in my workers without worrying about conversions and stuff.
After much trials I came up with this. And it works on my ubuntu 16, python 3.6, 16GB, 8 core machine. I did a lot of "shortcuts" compared to previous approaches. No global shared state, no pure memory pointers that need to be converted to numpy inside workers, large numpy arrays passed as process arguments, etc.
Pastebin link above, but I will put few snippets here.
Some imports:
import numpy as np
import multiprocessing as mp
import multiprocessing.sharedctypes
import ctypes
Allocate some shared mem and wrap it into an numpy array:
def create_np_shared_array(shape, dtype, ctype)
. . . .
shared_mem_chunck = mp.sharedctypes.RawArray(ctype, size)
numpy_array_view = np.frombuffer(shared_mem_chunck, dtype).reshape(shape)
return numpy_array_view
Create shared array and put something in it
src = np.random.rand(*SHAPE).astype(np.float32)
src_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float)
dst_shared = create_np_shared_array(SHAPE,np.float32,ctypes.c_float)
src_shared[:] = src[:] # Some numpy ops accept an 'out' array where to store the results
Spawn the process:
p = mp.Process(target=lengthly_operation,args=(src_shared, dst_shared, k, k + STEP))
p.start()
p.join()
Here are some results (see pastebin code for full reference):
Serial version: allocate mem 2.3741257190704346 exec: 17.092209577560425 total: 19.46633529663086 Succes: True
Parallel with trivial np: allocate mem 2.4535582065582275 spawn process: 0.00015354156494140625 exec: 3.4581971168518066 total: 5.911908864974976 Succes: False
Parallel with shared mem np: allocate mem 4.535916328430176 (pure alloc:4.014216661453247 copy: 0.5216996669769287) spawn process: 0.00015664100646972656 exec: 3.6783478260040283 total: 8.214420795440674 Succes: True
I also did a cProfile
(why 2 extra seconds when allocating shared mem?) and realized that there are some calls to the tempfile.py
, {method 'write' of '_io.BufferedWriter' objects}
.
Questions
Notes
multiprocessing is a drop in replacement for Python's multiprocessing module. It supports the exact same operations, but extends it, so that all tensors sent through a multiprocessing. Queue , will have their data moved into shared memory and will only send a handle to another process.
NumPy uses much less memory to store dataThe NumPy arrays takes significantly less amount of memory as compared to python lists. It also provides a mechanism of specifying the data types of the contents, which allows further optimisation of the code.
Array() to put the NumPy array into shared memory. We use a multiprocessing.
In general numpy arrays can have more than one dimension. One way to create such array is to start with a 1-dimensional array and use the numpy reshape() function that rearranges elements of that array into a new shape.
Allocation of the shared array is slow, because apparently it's written to disk first, so it can be shared through a mmap. For reference see heap.py and sharedctypes.py.
This is why tempfile.py
shows up in the profiler. I think the advantage of this approach is that the shared memory is cleaned up in case of a crash, and this cannot be guaranteed with POSIX shared memory.
There is no pickling happening with your code, thanks to fork and, as you said, the memory is inherited. The reason the 2nd run doesn't work is because the child processes are not allowed to write in the memory of the parent. Instead, private pages are allocated on the fly, only to be discared when the child process ends.
I only have one suggestion: You don't have to specify a ctype yourself, the right type can be figured out from the numpy dtype through np.ctypeslib._typecodes
. Or just use c_byte
for everything and use the dtype itemsize to figure out the size of the buffer, it will be casted by numpy anyway.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With