Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sharing contiguous numpy arrays between processes in python

While I have found numerous answers to questions similar to mine, I don't believe it has been directly addressed here--and I have several additional questions. The motivation for sharing contiguous numpy arrays is as follows:

  • I'm using a convolutional neural network run on Caffe to perform a regression on images to a series of continuous-value labels.
  • The images require specific preprocessing and data augmentation.
  • The constraints of (1) the continuous nature of the labels (they're floats) and (2) the data augmentation means that I'm preprocessing the data in python and then serving it up as contiguous numpy arrays using the in-memory data layer in Caffe.
  • Loading the training data into memory is comparatively slow. I'd like to parallelize it such that:

(1) The python I'm writing creates a "data handler" class which instantiates two contiguous numpy arrays. (2) A worker process alternates between those numpy arrays, loading the data from the disk, performing preprocessing, and inserting the data into the numpy array. (3) Meanwhile, the python Caffe wrappers send data from the other array to the GPU to be run through the net.

I have a few questions:

  1. Is it possible to allocate memory in a contiguous numpy array then wrap it in a shared memory object (I'm not sure if 'object' is the correct term here) using something like the Array class from python's multiprocessing?

  2. Numpy arrays have a .ctypes attribute, I presume this is useful for the instantiation of shared memory arrays from Array(), but can't seem to determine precisely how to use them.

  3. If the shared memory is instantiated without the numpy array, does it remain contiguous? If not, is there a way to ensure it does remain contiguous?

Is it possible to do something like:

import numpy as np
from multiprocessing import Array
contArr = np.ascontiguousarray(np.zeros((n_images, n_channels, img_height, img_width)), dtype=np.float32)
sm_contArr = Array(contArr.ctypes.?, contArr?)

Then instantiate the worker with

p.append(Process(target=some_worker_function, args=(data_to_load, sm_contArr)))
p.start()

Thanks!

Edit: I'm aware there are a number of libraries that have similar functions in varying states of maintenance. I would prefer to restrict this to pure python and numpy, but if that's not possible I would of course be willing to use one.

like image 423
eriophora Avatar asked Jul 01 '15 20:07

eriophora


People also ask

Are NumPy arrays contiguous?

ascontiguousarray() function is used to return a contiguous array where the dimension of the array is greater or equal to 1 and stored in memory (C order). Note: A contiguous array is stored in an unbroken block of memory. To access the subsequent value in the array, we move to the next memory address.

How do I connect two NumPy arrays?

Use numpy. concatenate() to merge the content of two or multiple arrays into a single array. This function takes several arguments along with the NumPy arrays to concatenate and returns a Numpy array ndarray. Note that this method also takes axis as another argument, when not specified it defaults to 0.

Are Python arrays contiguous in memory?

1) Array. Python has a built-in module named 'array' which is similar to arrays in C or C++. In this container, the data is stored in a contiguous block of memory. Just like arrays in C or C++, these arrays only support one data type at a time, therefore it's not heterogenous like Python lists.

Can NumPy arrays be multidimensional?

In general numpy arrays can have more than one dimension. One way to create such array is to start with a 1-dimensional array and use the numpy reshape() function that rearranges elements of that array into a new shape.


1 Answers

Wrap numpy's ndarray around multiprocessing's RawArray()

There are multiple ways to share numpy arrays in memory across processes. Let's have a look at how you can do it using the multiprocessing module.

The first important observation is that numpy provides the np.frombuffer() function to wrap an ndarray interface around a preexisting object that supports the buffer protocol (such as bytes(), bytearray(), array() and so on). This creates read-only arrays from read-only objects and writable arrays from writable objects.

We can combine that with the shared memory RawArray() that multiprocessing provides. Note that Array() doesn't work for that purpose, as it is a proxy object with a lock and doesn't directly expose the buffer interface. Of course that means that we need to provide for proper synchronization of our numpified RawArrays ourselves.

There is one complicating issue regarding ndarray-wrapped RawArrays: When multiprocessing sends such an array between processes - and indeed it will need to send our arrays, once created, to both workers - it pickles and then unpickles them. Unfortunately, that results in it creating copies of the ndarrays instead of sharing them in memory.

The solution, while a bit ugly, is to keep the RawArrays as is until they are transferred to the workers and only wrap them in ndarrays once each worker process has started.

Furthermore, it would have been preferable to communicate arrays, be it a plain RawArray or an ndarray-wrapped one, directly via a multiprocessing.Queue, but that doesn't work, either. A RawArray cannot be put inside such a Queue and an ndarray-wrapped one would have been pickled and unpickled, so in effect copied.

The workaround is to send a list of all pre-allocated arrays to the worker processes and communicate indices into that list over the Queues. It's very much like passing around tokens (the indices) and whoever holds the token is allowed to operate on the associated array.

The structure of the main program could look like this:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import queue

from multiprocessing import freeze_support, set_start_method
from multiprocessing import Event, Process, Queue
from multiprocessing.sharedctypes import RawArray


def create_shared_arrays(size, dtype=np.int32, num=2):
    dtype = np.dtype(dtype)
    if dtype.isbuiltin and dtype.char in 'bBhHiIlLfd':
        typecode = dtype.char
    else:
        typecode, size = 'B', size * dtype.itemsize

    return [RawArray(typecode, size) for _ in range(num)]


def main():
    my_dtype = np.float32

    # 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage
    arrays = create_shared_arrays(125000000, dtype=my_dtype)
    q_free = Queue()
    q_used = Queue()
    bail = Event()

    for arr_id in range(len(arrays)):
        q_free.put(arr_id)  # pre-fill free queue with allocated array indices

    pr1 = MyDataLoader(arrays, q_free, q_used, bail,
                       dtype=my_dtype, step=1024)
    pr2 = MyDataProcessor(arrays, q_free, q_used, bail,
                          dtype=my_dtype, step=1024)

    pr1.start()
    pr2.start()

    pr2.join()
    print("\n{} joined.".format(pr2.name))

    pr1.join()
    print("{} joined.".format(pr1.name))


if __name__ == '__main__':
    freeze_support()

    # On Windows, only "spawn" is available.
    # Also, this tests proper sharing of the arrays without "cheating".
    set_start_method('spawn')
    main()

This prepares a list of two arrays, two Queues - a "free" queue where MyDataProcessor puts array indices it is done with and MyDataLoader fetches them from as well as a "used" queue where MyDataLoader puts indices of readily filled arrays and MyDataProcessor fetches them from - and a multiprocessing.Event to start a concerted bail out of all workers. We could do away with the latter for now, as we have only one producer and one consumer of arrays, but it doesn't hurt being prepared for more workers.

Then we pre-fill the "empty" Queue with all indices of our RawArrays in the list and instantiate one of each type of workers, passing them the necessary communication objects. We start both of them and just wait for them to join().

Here's how MyDataProcessor could look like, which consumes array indices from the "used" Queue and sends the data off to some external black box (debugio.output in the example):

class MyDataProcessor(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
        super().__init__()
        self.arrays = arrays
        self.q_free = q_free
        self.q_used = q_used
        self.bail = bail
        self.dtype = dtype
        self.step = step

    def run(self):
        # wrap RawArrays inside ndarrays
        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

        from debugio import output as writer

        while True:
            arr_id = self.q_used.get()
            if arr_id is None:
                break

            arr = arrays[arr_id]

            print('(', end='', flush=True)          # just visualizing activity
            for j in range(0, len(arr), self.step):
                writer.write(str(arr[j]) + '\n')
            print(')', end='', flush=True)          # just visualizing activity

            self.q_free.put(arr_id)

            writer.flush()

        self.bail.set()                     # tell loaders to bail out ASAP
        self.q_free.put(None, timeout=1)    # wake up loader blocking on get()

        try:
            while True:
                self.q_used.get_nowait()    # wake up loader blocking on put()
        except queue.Empty:
            pass

The first it does is wrap the received RawArrays in ndarrays using 'np.frombuffer()' and keep the new list, so they are usable as numpy arrays during the process' runtime and it doesn't have to wrap them over and over again.

Note also that MyDataProcessor only ever writes to the self.bail Event, it never checks it. Instead, if it needs to be told to quit, it will find a None mark on the queue instead of an array index. This is done for when a MyDataLoader has no more data available and starts the tear down procedure, MyDataProcessor can still process all valid arrays that are in the queue without prematurely exiting.

This is how MyDataLoader could look like:

class MyDataLoader(Process):
    def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
        super().__init__()
        self.arrays = arrays
        self.q_free = q_free
        self.q_used = q_used
        self.bail = bail
        self.dtype = dtype
        self.step = step

    def run(self):
        # wrap RawArrays inside ndarrays
        arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]

        from debugio import input as reader

        for _ in range(10):  # for testing we end after a set amount of passes
            if self.bail.is_set():
                # we were asked to bail out while waiting on put()
                return

            arr_id = self.q_free.get()
            if arr_id is None:
                # we were asked to bail out while waiting on get()
                self.q_free.put(None, timeout=1)  # put it back for next loader
                return

            if self.bail.is_set():
                # we were asked to bail out while we got a normal array
                return

            arr = arrays[arr_id]

            eof = False
            print('<', end='', flush=True)          # just visualizing activity
            for j in range(0, len(arr), self.step):
                line = reader.readline()
                if not line:
                    eof = True
                    break

                arr[j] = np.fromstring(line, dtype=self.dtype, sep='\n')

            if eof:
                print('EOF>', end='', flush=True)   # just visualizing activity
                break

            print('>', end='', flush=True)          # just visualizing activity

            if self.bail.is_set():
                # we were asked to bail out while we filled the array
                return

            self.q_used.put(arr_id)     # tell processor an array is filled

        if not self.bail.is_set():
            self.bail.set()             # tell other loaders to bail out ASAP
            # mark end of data for processor as we are the first to bail out
            self.q_used.put(None)

It is very similar in structure to the other worker. The reason it is bloated up a bit is that it checks the self.bail Event at many points, so as to reduce the likelihood to get stuck. (It's not completely foolproof, as there is a tiny chance that the Event could get set between checking and accessing the Queue. If that's a problem, one needs to use some synchronization primitive arbitrating access to both the Event and the Queue combined.)

It also wraps the received RawArrays in ndarrays at the very beginning and reads data from an external black box (debugio.input in the example).

Note that by playing around with the step= arguments to both workers in the main() function, we can change the ratio of how much reading and writing is done (strictly for testing purposes - in a production environment step= would be 1, reading and writing all numpy array members).

Increasing both values makes the workers only access a few of the values in the numpy arrays, thereby significantly speeding everything up, which goes to show that the performance is not limited by the communication between the worker processes. Had we put numpy arrays directly onto the Queues, copying them forth and back between the processes in whole, increasing the step size would not have significantly improved the performance - it would have remained slow.

For reference, here is the debugio module I used for testing:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from ast import literal_eval
from io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapper


class DebugInput(RawIOBase):
    def __init__(self, end=None):
        if end is not None and end < 0:
            raise ValueError("end must be non-negative")

        super().__init__()
        self.pos = 0
        self.end = end

    def readable(self):
        return True

    def read(self, size=-1):
        if self.end is None:
            if size < 0:
                raise NotImplementedError("size must be non-negative")
            end = self.pos + size
        elif size < 0:
            end = self.end
        else:
            end = min(self.pos + size, self.end)

        lines = []
        while self.pos < end:
            offset = self.pos % 400
            pos = self.pos - offset
            if offset < 18:
                i = (offset + 2) // 2
                pos += i * 2 - 2
            elif offset < 288:
                i = (offset + 12) // 3
                pos += i * 3 - 12
            else:
                i = (offset + 112) // 4
                pos += i * 4 - 112

            line = str(i).encode('ascii') + b'\n'
            line = line[self.pos - pos:end - pos]
            self.pos += len(line)
            size -= len(line)
            lines.append(line)

        return b''.join(lines)

    def readinto(self, b):
        data = self.read(len(b))
        b[:len(data)] = data
        return len(data)

    def seekable(self):
        return True

    def seek(self, offset, whence=0):
        if whence == 0:
            pos = offset
        elif whence == 1:
            pos = self.pos + offset
        elif whence == 2:
            if self.end is None:
                raise ValueError("cannot seek to end of infinite stream")
            pos = self.end + offset
        else:
            raise NotImplementedError("unknown whence value")

        self.pos = max((pos if self.end is None else min(pos, self.end)), 0)
        return self.pos


class DebugOutput(RawIOBase):
    def __init__(self):
        super().__init__()
        self.buf = b''
        self.num = 1

    def writable(self):
        return True

    def write(self, b):
        *lines, self.buf = (self.buf + b).split(b'\n')

        for line in lines:
            value = literal_eval(line.decode('ascii'))
            if value != int(value) or int(value) & 255 != self.num:
                raise ValueError("expected {}, got {}".format(self.num, value))

            self.num = self.num % 127 + 1

        return len(b)


input = TextIOWrapper(BufferedReader(DebugInput()), encoding='ascii')
output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding='ascii')
like image 77
blubberdiblub Avatar answered Oct 19 '22 08:10

blubberdiblub