Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fast Queue of read only numpy arrays

I have a multiprocessing job where I'm queuing read only numpy arrays, as part of a producer consumer pipeline.

Currently they're being pickled, because this is the default behaviour of multiprocessing.Queue which slows down performance.

Is there any pythonic way to pass references to shared memory instead of pickling the arrays?

Unfortunately the arrays are being generated after the consumer is started, and there is no easy way around that. (So the global variable approach would be ugly...).

[Note that in the following code we are not expecting h(x0) and h(x1) to be computed in parallel. Instead we see h(x0) and g(h(x1)) computed in parallel (like a pipelining in a CPU).]

from multiprocessing import Process, Queue
import numpy as np

class __EndToken(object):
    pass

def parrallel_pipeline(buffer_size=50):
    def parrallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Process(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parrallel_pipeline_with_args


@parrallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parrallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parrallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

if __name__ == "__main__":
    rs = f(g(h(xs())))
    for r in rs:
        print r
like image 590
N. McA. Avatar asked Jul 29 '16 19:07

N. McA.


1 Answers

Sharing memory between threads or processes

Use threading instead of multiprocessing

Since you're using numpy, you can take advantage of the fact that the global interpreter lock is released during numpy computations. This means you can do parallel processing with standard threads and shared memory, instead of multiprocessing and inter-process communication. Here's a version of your code, tweaked to use threading.Thread and Queue.Queue instead of multiprocessing.Process and multiprocessing.Queue. This passes a numpy ndarray via a queue without pickling it. On my computer, this runs about 3 times faster than your code. (However, it's only about 20% faster than the serial version of your code. I have suggested some other approaches further down.)

from threading import Thread
from Queue import Queue
import numpy as np

class __EndToken(object):
    pass

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = Queue(buffer_size)
            consumer_process = Thread(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args

@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

rs = f(g(h(xs())))
%time print sum(r.sum() for r in rs)  # 12.2s

Store numpy arrays in shared memory

Another option, close to what you requested, would be to continue using the multiprocessing package, but pass data between processes using arrays stored in shared memory. The code below creates a new ArrayQueue class to do that. The ArrayQueue object should be created before spawning subprocesses. It creates and manages a pool of numpy arrays backed by shared memory. When a result array is pushed onto the queue, ArrayQueue copies the data from that array into an existing shared-memory array, then passes the id of the shared-memory array through the queue. This is much faster than sending the whole array through the queue, since it avoids pickling the arrays. This has similar performance to the threaded version above (about 10% slower), and may scale better if the global interpreter lock is an issue (i.e., you run a lot of python code in the functions).

from multiprocessing import Process, Queue, Array
import numpy as np

class ArrayQueue(object):
    def __init__(self, template, maxsize=0):
        if type(template) is not np.ndarray:
            raise ValueError('ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.')
        if maxsize == 0:
            # this queue cannot be infinite, because it will be backed by real objects
            raise ValueError('ArrayQueue(template, maxsize) must use a finite value for maxsize.')

        # find the size and data type for the arrays
        # note: every ndarray put on the queue must be this size
        self.dtype = template.dtype
        self.shape = template.shape
        self.byte_count = len(template.data)

        # make a pool of numpy arrays, each backed by shared memory, 
        # and create a queue to keep track of which ones are free
        self.array_pool = [None] * maxsize
        self.free_arrays = Queue(maxsize)
        for i in range(maxsize):
            buf = Array('c', self.byte_count, lock=False)
            self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape)
            self.free_arrays.put(i)

        self.q = Queue(maxsize)

    def put(self, item, *args, **kwargs):
        if type(item) is np.ndarray:
            if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count:
                # get the ID of an available shared-memory array
                id = self.free_arrays.get()
                # copy item to the shared-memory array
                self.array_pool[id][:] = item
                # put the array's id (not the whole array) onto the queue
                new_item = id
            else:
                raise ValueError(
                    'ndarray does not match type or shape of template used to initialize ArrayQueue'
                )
        else:
            # not an ndarray
            # put the original item on the queue (as a tuple, so we know it's not an ID)
            new_item = (item,)
        self.q.put(new_item, *args, **kwargs)

    def get(self, *args, **kwargs):
        item = self.q.get(*args, **kwargs)
        if type(item) is tuple:
            # unpack the original item
            return item[0]
        else:
            # item is the id of a shared-memory array
            # copy the array
            arr = self.array_pool[item].copy()
            # put the shared-memory array back into the pool
            self.free_arrays.put(item)
            return arr

class __EndToken(object):
    pass

def parallel_pipeline(buffer_size=50):
    def parallel_pipeline_with_args(f):
        def consumer(xs, q):
            for x in xs:
                q.put(x)
            q.put(__EndToken())

        def parallel_generator(f_xs):
            q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size)
            consumer_process = Process(target=consumer,args=(f_xs,q,))
            consumer_process.start()
            while True:
                x = q.get()
                if isinstance(x, __EndToken):
                    break
                yield x

        def f_wrapper(xs):
            return parallel_generator(f(xs))

        return f_wrapper
    return parallel_pipeline_with_args


@parallel_pipeline(3)
def f(xs):
    for x in xs:
        yield x + 1.0

@parallel_pipeline(3)
def g(xs):
    for x in xs:
        yield x * 3

@parallel_pipeline(3)
def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

print "multiprocessing with shared-memory arrays:"
%time print sum(r.sum() for r in f(g(h(xs()))))   # 13.5s

Parallel processing of samples instead of functions

The code above is only about 20% faster than a single-threaded version (12.2s vs. 14.8s for the serial version shown below). That is because each function is run in a single thread or process, and most of the work is done by xs(). The execution time for the example above is nearly the same as if you just ran %time print sum(1 for x in xs()).

If your real project has many more intermediate functions and/or they are more complex than the ones you showed, then the workload may be distributed better among processors, and this may not be a problem. However, if your workload really does resemble the code you provided, then you may want to refactor your code to allocate one sample to each thread instead of one function to each thread. That would look like the code below (both threading and multiprocessing versions are shown):

import multiprocessing
import threading, Queue
import numpy as np

def f(x):
    return x + 1.0

def g(x):
    return x * 3

def h(x):
    return x * x

def final(i):
    return f(g(h(x(i))))

def final_sum(i):
    return f(g(h(x(i)))).sum()

def x(i):
    # produce sample number i
    return np.random.uniform(0, 1, (500, 2000))

def rs_serial(func, n):
    for i in range(n):
        yield func(i)

def rs_parallel_threaded(func, n):
    todo = range(n)
    q = Queue.Queue(2*n_workers)

    def worker():
        while True:
            try:
                # the global interpreter lock ensures only one thread does this at a time
                i = todo.pop()
                q.put(func(i))
            except IndexError:
                # none left to do
                q.put(None)
                break

    threads = []
    for j in range(n_workers):
        t = threading.Thread(target=worker)
        t.daemon=False
        threads.append(t)   # in case it's needed later
        t.start()

    while True:
        x = q.get()
        if x is None:
            break
        else:
            yield x

def rs_parallel_mp(func, n):
    pool = multiprocessing.Pool(n_workers)
    return pool.imap_unordered(func, range(n))

n_workers = 4
n_samples = 1000

print "serial:"  # 14.8s
%time print sum(r.sum() for r in rs_serial(final, n_samples))
print "threaded:"  # 10.1s
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples))

print "mp return arrays:"  # 19.6s
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples))
print "mp return results:"  # 8.4s
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples))

The threaded version of this code is only slightly faster than the first example I gave, and only about 30% faster than the serial version. That's not as much of a speedup as I would have expected; maybe Python is still getting partly bogged down by the GIL?

The multiprocessing version performs significantly faster than your original multiprocessing code, primarily because all the functions get chained together in a single process, rather than queueing (and pickling) intermediate results. However, it is still slower than the serial version because all the result arrays have to get pickled (in the worker process) and unpickled (in the main process) before being returned by imap_unordered. However, if you can arrange it so that your pipeline returns aggregate results instead of the complete arrays, then you can avoid the pickling overhead, and the multiprocessing version is fastest: about 43% faster than the serial version.

OK, now for the sake of completeness, here's a version of the second example that uses multiprocessing with your original generator functions instead of the finer-scale functions shown above. This uses some tricks to spread the samples among multiple processes, which may make it unsuitable for many workflows. But using generators does seem to be slightly faster than using the finer-scale functions, and this method can get you up to a 54% speedup vs. the serial version shown above. However, that is only available if you don't need to return the full arrays from the worker functions.

import multiprocessing, itertools, math
import numpy as np

def f(xs):
    for x in xs:
        yield x + 1.0

def g(xs):
    for x in xs:
        yield x * 3

def h(xs):
    for x in xs:
        yield x * x

def xs():
    for i in range(1000):
        yield np.random.uniform(0,1,(500,2000))

def final():
    return f(g(h(xs())))

def final_sum():
    for x in f(g(h(xs()))):
        yield x.sum()

def get_chunk(args):
    """Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list. 
    This runs in a worker process and does all the computation."""
    return list(itertools.islice(args[0](), args[1]))

def parallelize(gen_func, max_items, n_workers=4, chunk_size=50):
    """Pull up to max_items items from several copies of gen_func, in small groups in parallel processes.
    chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk)
    but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory)."""

    pool = multiprocessing.Pool(n_workers)

    # how many chunks will be needed to yield at least max_items items?
    n_chunks = int(math.ceil(float(max_items)/float(chunk_size)))

    # generate a suitable series of arguments for get_chunk()
    args_list = itertools.repeat((gen_func, chunk_size), n_chunks)

    # chunk_gen will yield a series of chunks (lists of results) from the generator function, 
    # totaling n_chunks * chunk_size items (which is >= max_items)
    chunk_gen = pool.imap_unordered(get_chunk, args_list)

    # parallel_gen flattens the chunks, and yields individual items
    parallel_gen = itertools.chain.from_iterable(chunk_gen)

    # limit the output to max_items items 
    return itertools.islice(parallel_gen, max_items)


# in this case, the parallel version is slower than a single process, probably
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?)
print "serial, return arrays:"  # 15.3s
%time print sum(r.sum() for r in final())
print "parallel, return arrays:"  # 24.2s
%time print sum(r.sum() for r in parallelize(final, max_items=1000))


# in this case, the parallel version is more than twice as fast as the single-thread version
print "serial, return result:"  # 15.1s
%time print sum(r for r in final_sum())
print "parallel, return result:"  # 6.8s
%time print sum(r for r in parallelize(final_sum, max_items=1000))
like image 157
Matthias Fripp Avatar answered Sep 27 '22 22:09

Matthias Fripp