Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Making my NumPy array shared across processes

I have read quite a few of the questions on SO about sharing arrays and it seems simple enough for simple arrays but I am stuck trying to get it working for the array I have.

import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')

I have tried converting this to a shared array by trying to somehow make mp.Array accept the data, I have also tried creating the array as using ctypes as such:

import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)

The only way I have managed to get my code working is not passing data to the function but passing an encoded string to be uncompressed/decoded, this would however end up in n (number of strings) processes being called which seems redundant. My desired implementation is based on slicing the list of binary strings into x (number of processes) and passing this chunk, data and an index to the processes which works except that data is modified locally, hence the question on how to make it shared, any example working with a custom (nested) numpy array would already be a great help.

PS: This question is a follow up from Python multi-processing

like image 224
Bas Jansen Avatar asked Apr 12 '13 16:04

Bas Jansen


People also ask

Does NumPy use multithreading?

First, numpy supports multithreading, and this can give you a speed boost in multicore environments!

Does NumPy use GPU or CPU?

Still, even with that speedup ,Numpy is only running on the CPU. With consumer CPUs typically having 8 cores or less, the amount of parallel processing, and therefore the amount of speedup that can be achieved, is limited. That's where our new friend CuPy comes in!

Does NumPy automatically use multiple cores?

NumPy does not run in parallel. On the other hand Numba fully utilizes the parallel execution capabilities of your computer. NumPy functions are not going to use multiple CPU cores, never mind the GPU.


1 Answers

Note that you can start out with an array of complex dtype:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

and view it as an array of homogenous dtype:

In [5]: data2 = data.view('float32')

and later, convert it back to complex dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

Changing the dtype is a very quick operation; it does not affect the underlying data, only the way NumPy interprets it. So changing the dtype is virtually costless.

So what you've read about arrays with simple (homogenous) dtypes can be readily applied to your complex dtype with the trick above.


The code below borrows many ideas from J.F. Sebastian's answer, here.

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64


def decode(arg):
    chunk, counter = arg
    print len(chunk), counter
    for x in chunk:
        peak_counter = 0
        data_buff = base64.b64decode(x)
        buff_size = len(data_buff) / 4
        unpack_format = ">%dL" % buff_size
        index = 0
        for y in struct.unpack(unpack_format, data_buff):
            buff1 = struct.pack("I", y)
            buff2 = struct.unpack("f", buff1)[0]
            with shared_arr.get_lock():
                data = tonumpyarray(shared_arr).view(
                    [('f0', '<f4'), ('f1', '<f4', (250000, 2))])
                if (index % 2 == 0):
                    data[counter][1][peak_counter][0] = float(buff2)
                else:
                    data[counter][1][peak_counter][1] = float(buff2)
                    peak_counter += 1
            index += 1
        counter += 1


def pool_init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())


def numpy_array(shared_arr, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors = mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size = int(len(peaks) / processors)
        map_parameters = []
        for i in range(processors):
            counter = i * chunk_size
            # WARNING: I removed -1 from (i + 1)*chunk_size, since the right
            # index is non-inclusive. 
            chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode, map_parameters)

if __name__ == '__main__':
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
    peaks = ...
    numpy_array(shared_arr, peaks)

If you can guarantee that the various processes which execute the assignments

if (index % 2 == 0):
    data[counter][1][peak_counter][0] = float(buff2)
else:
    data[counter][1][peak_counter][1] = float(buff2)

never compete to alter the data in the same locations, then I believe you can actually forgo using the lock

with shared_arr.get_lock():

but I don't grok your code well enough to know for sure, so to be on the safe side, I included the lock.

like image 111
unutbu Avatar answered Nov 15 '22 17:11

unutbu