Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cython - efficiently filtering a typed memoryview

This Cython function returns a random element among the elements of a numpy array that are within certain limits:

cdef int search(np.ndarray[int] pool):
  cdef np.ndarray[int] limited
  limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
  return np.random.choice(limited)

This works just fine. However, this function is very critical to the performance of my code. Typed memoryviews are apparently very much faster than numpy arrays, but they cannot be filtered in the same way as above.

How could I write a function that does the same thing as above using typed memoryviews? Or is there another way to improve the performance of the function?

like image 925
Waiski Avatar asked Feb 17 '26 05:02

Waiski


1 Answers

Okay, let's start with making the code more general, I'll come to the performance aspect later.

I generally don't use:

import numpy as np
cimport numpy as np

Personally I like using a different name for the cimported package because it helps to keep the C side and the NumPy-Python side apart. So for this answer I'll use

import numpy as np
cimport numpy as cnp

Also I'll make lower_limit and upper_limit arguments of the function. Maybe these are defined statically (or globally) in your case but it makes the example more standalone. So the starting point is a slightly modified version of your code:

cpdef int search_1(cnp.ndarray[int] pool, int lower_limit, int upper_limit):
    cdef cnp.ndarray[int] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

One very nice feature in Cython is fused types, so you can easily generalize this function for different types. Your approach will only work for 32bit integer arrays (at least if int is 32bit on your computer). It is very easy to support more array types:

ctypedef fused int_or_float:
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t

cpdef int_or_float search_2(cnp.ndarray[int_or_float] pool, int_or_float lower_limit, int_or_float upper_limit):
    cdef cnp.ndarray[int_or_float] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

Of course you can add more types if you want. The advantage is that the new version works where the old version failed:

>>> search_1(np.arange(100, dtype=np.float_), 10, 20)
ValueError: Buffer dtype mismatch, expected 'int' but got 'double'
>>> search_2(np.arange(100, dtype=np.float_), 10, 20)
19.0

Now that it's more general let's take a look at what your function actually does:

  • You create a boolean array where the elements are above the lower limit
  • You create a boolean array where the elements are below the upper limit
  • You create a boolean array by bitwise and of the two boolean arrays.
  • You create a new array containing only the elements where the boolean mask is true
  • You only extract one element from the last array

Why create so many arrays? I mean you could simply count how many elements are in within the limits, take a random integer between 0 and the number of elements within the limits and then take whatever element would be at that index in the result array.

cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int_or_float search_3(cnp.ndarray[int_or_float] arr, int_or_float lower_bound, int_or_float upper_bound):
    cdef int_or_float element

    # Count the number of elements that are within the limits
    cdef Py_ssize_t num_valid = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            num_valid += 1

    # Take a random index
    cdef Py_ssize_t random_index = np.random.randint(0, num_valid)

    # Go through the array again and take the element at the random index that
    # is within the bounds
    cdef Py_ssize_t clamped_index = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            if clamped_index == random_index:
                return element
            clamped_index += 1

It won't be much faster but it will save a lot of memory. And because you don't have intermediate arrays you don't need memoryviews at all - but if you like you can just replace the cnp.ndarray[int_or_float] arr in the argument list with int_or_float[:] or even int_or_float[::1] arr and operate on the memoryview (it probably won't be faster but it also won't be slower).

I generally prefer numba to Cython (at least if just I am using it) so let's compare it to the numba version of that code:

import numba as nb
import numpy as np

@nb.njit
def search_numba(arr, lower, upper):
    num_valids = 0
    for item in arr:
        if item >= lower and item <= upper:
            num_valids += 1

    random_index = np.random.randint(0, num_valids)

    valid_index = 0
    for item in arr:
        if item >= lower and item <= upper:
            if valid_index == random_index:
                return item
            valid_index += 1

And also to a numexpr variant:

import numexpr

np.random.choice(arr[numexpr.evaluate('(arr >= l) & (arr <= u)')])

Okay, let's do a benchmark:

from simple_benchmark import benchmark, MultiArgument

arguments = {2**i: MultiArgument([np.random.randint(0, 100, size=2**i, dtype=np.int_), 5, 50]) for i in range(2, 22)}
funcs = [search_1, search_2, search_3, search_numba, search_numexpr]

b = benchmark(funcs, arguments, argument_name='array size')

enter image description here

So by not using intermediate arrays you can be roughly 5 times faster and if you would use numba you could get another factor 5 (seems like I'm missing some possible Cython optimizations there, numba is typically ~2 times faster or as fast as Cython). So you could get it ~20 times faster with the numba solution.

numexpr isn't really comparable here, mostly because you cannot use the boolean array indexing there.

The differences will depend on the content of the array and the limits. You also have to measure the performance of your application.


As an aside: If the lower limit and the upper limit generally don't change the fastest solution would be to filter the array once and then just call np.random.choice on it several times. That could be orders of magnitude faster.

lower_limit = ...
upper_limit = ...
filtered_array = pool[(pool >= lower_limit) & (pool <= upper_limit)]

def search_cached():
    return np.random.choice(filtered_array)

%timeit search_cached()
2.05 µs ± 122 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

So almost 1000 times faster and doesn't need Cython or numba at all. But it's a special case that may not be useful for you.


The benchmark setup in case you want to do it yourself is here (based on a Jupyter Notebook/Lab, thus the %-symbols):

%load_ext cython

%%cython

cimport numpy as cnp
import numpy as np

cpdef int search_1(cnp.ndarray[int] pool, int lower_limit, int upper_limit):
    cdef cnp.ndarray[int] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

ctypedef fused int_or_float:
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t

cpdef int_or_float search_2(cnp.ndarray[int_or_float] pool, int_or_float lower_limit, int_or_float upper_limit):
    cdef cnp.ndarray[int_or_float] limited
    limited = pool[(pool >= lower_limit) & (pool <= upper_limit)]
    return np.random.choice(limited)

cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int_or_float search_3(cnp.ndarray[int_or_float] arr, int_or_float lower_bound, int_or_float upper_bound):
    cdef int_or_float element
    cdef Py_ssize_t num_valid = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            num_valid += 1

    cdef Py_ssize_t random_index = np.random.randint(0, num_valid)

    cdef Py_ssize_t clamped_index = 0
    for index in range(arr.shape[0]):
        element = arr[index]
        if lower_bound <= element <= upper_bound:
            if clamped_index == random_index:
                return element
            clamped_index += 1

import numexpr
import numba as nb
import numpy as np

def search_numexpr(arr, l, u):
    return np.random.choice(arr[numexpr.evaluate('(arr >= l) & (arr <= u)')])

@nb.njit
def search_numba(arr, lower, upper):
    num_valids = 0
    for item in arr:
        if item >= lower and item <= upper:
            num_valids += 1

    random_index = np.random.randint(0, num_valids)

    valid_index = 0
    for item in arr:
        if item >= lower and item <= upper:
            if valid_index == random_index:
                return item
            valid_index += 1

from simple_benchmark import benchmark, MultiArgument

arguments = {2**i: MultiArgument([np.random.randint(0, 100, size=2**i, dtype=np.int_), 5, 50]) for i in range(2, 22)}
funcs = [search_1, search_2, search_3, search_numba, search_numexpr]

b = benchmark(funcs, arguments, argument_name='array size')

%matplotlib widget

import matplotlib.pyplot as plt

plt.style.use('ggplot')
b.plot()
like image 188
MSeifert Avatar answered Feb 18 '26 17:02

MSeifert



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!