Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Set output matrix on numpy binomial function

I want to use the following example in my project, which fills an array with random values using multiple threads. https://numpy.org/doc/1.18/reference/random/multithreading.html.

However, instead of the standard normal distribution I want to use a binomial distribution. My problem is that the method numpy.random.Generator.binomial does not have an "out" argument (like the standard_normal method) where the result will be placed. This means that I will have to copy the output matrix given to me to my matrix, heavily decreasing performance.

Is there an alternative approach that will solve this issue?

If this helps, I actually need the Bernoulli distribution, i.e. n=1 in the binomial distribution (but an arbitrary p).

like image 641
Noam Atar Avatar asked Jun 02 '20 11:06

Noam Atar


1 Answers

The following code can use random generators both with and without support for the out parameter. While, in general, the use of the out parameter result in faster execution, you can get some benefit in using parallel execution even in its absence.

import os
import concurrent.futures
import numpy as np


def _rg_size(bit_gen, dist, num, *args, **kwargs):
    return getattr(np.random.Generator(bit_gen), dist)(size=num, *args, **kwargs)


def _rg_out(bit_gen, dist, out, *args, **kwargs):
    return getattr(np.random.Generator(bit_gen), dist)(out=out, *args, **kwargs)


def splitter(num, num_chunks):
    chunk_size = num // num_chunks + (1 if num % num_chunks else 0)
    while num > chunk_size:
        num -= chunk_size
        yield chunk_size
    yield num


def slicing_splitter(num, num_chunks):
    chunk_size = num // num_chunks + (1 if num % num_chunks else 0)
    i = 0
    remaining = num
    while remaining > chunk_size:
        remaining -= chunk_size
        yield slice(i, i + chunk_size)
        i += chunk_size
    yield slice(i, num)


def new_rgs(rg):
    while True:
        new_rg = rg.jumped()
        yield new_rg
        rg = new_rg


def glue(arrs, size, num_arrs=None):
    if num_arrs is None and hasattr(arrs, __len__):
        num_arrs = len(arrs)
    slicings = slicing_splitter(size, num_arrs)
    arrs = iter(arrs)
    arr = next(arrs)
    slicing = next(slicings)
    out = np.empty(size, dtype=arr.dtype)
    out[slicing] = arr
    for arr, slicing in zip(arrs, slicings):
        out[slicing] = arr
    return out


def parallel_rand_gen(
        num=None,
        dist='standard_normal',
        bit_gen=None,
        seed=None,
        out=None,
        num_workers=None,
        split_factor=1,
        *args,
        **kwargs):
    if num_workers is None:
        num_workers = min(32, os.cpu_count() + 1)
    if bit_gen is None:
        bit_gen = np.random.PCG64(seed)
    if out is not None:
        shape = out.shape
        out = out.ravel()
        num = out.size
    elif num is None:
        raise ValueError('Either `num` or `out` must be specified.')
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        num_splits = split_factor * num_workers
        if out is None:
            futures = [
                executor.submit(_rg_size, rg, dist, n, *args, **kwargs)
                for rg, n in zip(new_rgs(bit_gen), splitter(num, num_splits))]
            concurrent.futures.wait(futures)
            result = (future.result() for future in futures)
            # out = np.concatenate(tuple(result))  # slower alternative
            out = glue(result, num, num_splits)
        else:
            futures = [
                executor.submit(_rg_out, rg, dist, out[slicing], *args, **kwargs)
                for rg, slicing in zip(new_rgs(bit_gen), slicing_splitter(num, num_splits))]
            concurrent.futures.wait(futures)
            out = out.reshape(shape)
    return out


print(parallel_rand_gen(17))
# [ 0.96710075  2.2935126   0.35537793  0.5825714   2.14440658  0.64483092
#   0.54062617  0.44907003  0.11947266 -0.60748694 -0.52509115  0.62924905
#   0.51714721  0.29864705 -0.46105766 -0.271093    0.33055528]

For standard_normal this gets to:

n = 10000001
%timeit parallel_rand_gen(n)
# 89.3 ms ± 1.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit out = np.empty(n); parallel_rand_gen(out=out)
# 74.6 ms ± 1.66 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit rg = np.random.Generator(np.random.PCG64()); rg.standard_normal(n)
# 181 ms ± 7.45 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

and, for binomial, this gets to:

n = 10000001
%timeit parallel_rand_gen(n, 'binomial', n=100, p=0.5)
# 480 ms ± 15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit rg = np.random.Generator(np.random.PCG64()); rg.binomial(100, 0.5, n)
# 1.17 s ± 35.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

(Tested on a 4-cores 6-years-old laptop.)

like image 119
norok2 Avatar answered Oct 19 '22 22:10

norok2