I have a matrix, X
, for which I am computing a weighted sum of intermediate matrix products. Here's a minimal reproducible example:
import numpy as np
random_state = np.random.RandomState(1)
n = 5
p = 10
X = random_state.rand(p, n) # 10x5
X_sum = np.zeros((n, n)) # 5x5
# The length of weights are not related to X's dims,
# but will always be smaller
y = 3
weights = random_state.rand(y)
for k in range(y):
X_sum += np.dot(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k]
This works fine and produces the results I expect. However, as the size of n
and y
grow (into the hundreds), this becomes extremely expensive, as repetitively computing matrix products is not exactly efficient...
There is an obvious pattern in how the products are computed, however:
You can see as the iterations progress, the starting column slice in Xt
moves to the right, while the ending row in X
moves up. Here's what the Nth iteration would look like:
This effectively means a subset of the same values are being repeatedly multiplied (see edit 2), which seems to me there may be an opportunity to exploit... (i.e., if I were to manually compute the product in one pass).
But I'm hoping not to have to do anything manually and that there may be a nice way to achieve this whole loop more elegantly with Numpy.
A realistic set of numbers:
n = 400
p = 2000
y = 750
To address the comment:
Could you explain what values are repeatedly multiplied?
Consider the following array:
n = p = 5
X = np.arange(25).reshape(p, n)
For k=0
, the first product will be between A
and B
:
k = 0
A = X.T[:, k + 1:]
B = X[:p - (k + 1), :]
>>> A
array([[ 5, 10, 15, 20],
[ 6, 11, 16, 21],
[ 7, 12, 17, 22],
[ 8, 13, 18, 23],
[ 9, 14, 19, 24]])
>>> B
array([[ 0, 1, 2, 3, 4],
[ 5, 6, 7, 8, 9],
[10, 11, 12, 13, 14],
[15, 16, 17, 18, 19]])
And when k=1
:
k = 1
>>> A
array([[10, 15, 20],
[11, 16, 21],
[12, 17, 22],
[13, 18, 23],
[14, 19, 24]])
>>> B
array([[ 0, 1, 2, 3, 4],
[ 5, 6, 7, 8, 9],
[10, 11, 12, 13, 14]])
So each subsequent matrix product is something of a subset of the previous product, if that makes sense.
TLDR; I would opt for @Parfait's use of test_gen_sum
based on benchmarking across various values of n
, p
, and y
. Keeping old answer here for continuity's sake.
n
, p
, y
affect choice of algorithmThis analysis is done using @Parfait's functions as a means of determining if there is really one best solution or if there are a family of solutions based on values of n
, p
, and y
.
import numpy as np
import pytest # This code also requires the pytest-benchmark plugin
def test_for_sum(n, p, y):
random_state = np.random.RandomState(1)
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
weights = random_state.rand(y)
for k in range(y):
X_sum += np.dot(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k]
return X_sum
def test_list_sum(n, p, y):
random_state = np.random.RandomState(1)
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
weights = random_state.rand(y)
matrix_list = [np.dot(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k] for k in range(y)]
X_sum = np.sum(matrix_list, axis=0)
return X_sum
def test_reduce_sum(n, p, y):
random_state = np.random.RandomState(1)
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
weights = random_state.rand(y)
matrix_list = [(X.T[:, k + 1:] @
X[:p - (k + 1), :]) * weights[k] for k in range(y)]
X_sum = reduce(lambda x,y: x + y, matrix_list)
return X_sum
def test_concat_sum(n, p, y):
random_state = np.random.RandomState(1)
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
weights = random_state.rand(y)
x_mat = np.concatenate([np.matmul(X.T[:, k + 1:],
X[:p - (k + 1), :]) for k in range(y)])
wgt_mat = np.concatenate([np.full((n,1), weights[k]) for k in range(y)])
mul_res = x_mat * wgt_mat
X_sum = mul_res.reshape(-1, n, n).sum(axis=0)
return X_sum
def test_matmul_sum(n, p, y):
random_state = np.random.RandomState(1)
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
weights = random_state.rand(y)
# Use list comprehension and np.matmul
matrices_list = [np.matmul(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k] for k in range(y)]
# Sum matrices in list of matrices to get the final result
X_sum = np.sum(matrices_list, axis=0)
return X_sum
def test_gen_sum(n, p, y):
random_state = np.random.RandomState(1)
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
weights = random_state.rand(y)
matrix_gen = (np.dot(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k] for k in range(y))
X_sum = sum(matrix_gen)
return X_sum
parameters = [
pytest.param(400, 800, 3)
,pytest.param(400, 2000, 3)
,pytest.param(400, 800, 750)
,pytest.param(400, 2000, 750)
]
@pytest.mark.parametrize('n,p,y', parameters)
def test_test_for_sum(benchmark, n, p, y):
benchmark(test_for_sum, n=n, p=p, y=y)
@pytest.mark.parametrize('n,p,y', parameters)
def test_test_list_sum(benchmark, n, p, y):
benchmark(test_list_sum, n=n, p=p, y=y)
@pytest.mark.parametrize('n,p,y', parameters)
def test_test_reduce_sum(benchmark, n, p, y):
benchmark(test_reduce_sum, n=n, p=p, y=y)
@pytest.mark.parametrize('n,p,y', parameters)
def test_test_concat_sum(benchmark, n, p, y):
benchmark(test_concat_sum, n=n, p=p, y=y)
@pytest.mark.parametrize('n,p,y', parameters)
def test_test_matmul_sum(benchmark, n, p, y):
benchmark(test_matmul_sum, n=n, p=p, y=y)
@pytest.mark.parametrize('n,p,y', parameters)
def test_test_gen_sum(benchmark, n, p, y):
benchmark(test_gen_sum, n=n, p=p, y=y)
n=400
, p=800
, y=3
(100 iterations)
test_gen_sum
n=400
, p=2000
, y=3
(100 iterations)
test_gen_sum
n=400
, p=800
, y=750
(10 iterations)
test_gen_sum
n=400
, p=2000
, y=750
(10 iterations)
test_gen_sum
y
valuesI would definitely use np.matmul
instead of np.dot
this will get you the biggest boost in performance and in fact the documentation for np.dot
will direct you to np.matmul
for 2D array multiplication in lieu of np.dot
.
I tested both np.dot
and np.matmul
with and without list comprehension and the pytest-benchmark results are here:
Btw pytest-benchmark is pretty slick and I highly recommend it at times like this to validate if an approach is truly performant.
Just using list comprehension has an almost negligible effect on np.matmul
results and a negative effect on np.dot
(albeit that is better form) in the scheme of things, but the combination of both changes yielded the best results in terms. I would warn that using list comprehensions tend to drive up the std. dev. of the runtime so you may see larger ranges in runtime performance than if you were to just use np.matmul
.
Here is the code:
import numpy as np
def test_np_matmul_list_comprehension():
random_state = np.random.RandomState(1)
n = p = 1000
X = np.arange(n * n).reshape(p, n)
# The length of weights are not related to X's dims,
# but will always be smaller
y = 3
weights = [1, 1, 1]
# Use list comprehension and np.matmul
matrices_list = [np.matmul(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k] for k in range(y)]
# Sum matrices in list of matrices to get the final result
X_sum = np.sum(matrices_list, axis=0)
y
valuesFor larger values of y
you are better off not using list comprehension. The mean/median runtime tends to be larger for both np.dot
and np.matmul
in both of those cases. Here are the pytest-benchmark
results for (n=500
,p=5000
,y=750
):
This is probably overkill, but I would rather err on the side of being too helpful :).
Consider the follow re-factored versions compared to the iterative sum calls in for
loop. The new versions using reduce
, generator, and np.concatenate
result marginally faster but still comparable to for
loop. Each runs with n = 400, p = 800, y = 750
.
OP Original version
import numpy as np
def test_for_sum():
random_state = np.random.RandomState(1)
n= 400
p = 800
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
y = 750
weights = random_state.rand(y)
for k in range(y):
X_sum += np.dot(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k]
return X_sum
List Comprehension with np.dot
def test_list_sum():
random_state = np.random.RandomState(1)
n= 400
p = 800
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
y = 750
weights = random_state.rand(y)
matrix_list = [np.dot(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k] for k in range(y)]
X_sum = sum(matrix_list)
return X_sum
Generator Version
def test_gen_sum():
random_state = np.random.RandomState(1)
n= 400
p = 800
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
y = 750
weights = random_state.rand(y)
matrix_gen = (np.dot(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k] for k in range(y))
X_sum = sum(matrix_gen)
return X_sum
Reduce Version (using new @
operator --syntactic sugar-- in place of np.matmul
)
from functools import reduce
def test_reduce_sum():
random_state = np.random.RandomState(1)
n= 400
p = 800
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
y = 750
weights = random_state.rand(y)
matrix_list = [(X.T[:, k + 1:] @
X[:p - (k + 1), :]) * weights[k] for k in range(y)]
X_sum = reduce(lambda x,y: x + y, matrix_list)
return X_sum
Concatenate Version
def test_concat_sum():
random_state = np.random.RandomState(1)
n= 400
p = 800
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
y = 750
weights = random_state.rand(y)
x_mat = np.concatenate([np.matmul(X.T[:, k + 1:],
X[:p - (k + 1), :]) for k in range(y)])
wgt_mat = np.concatenate([np.full((n,1), weights[k]) for k in range(y)])
mul_res = x_mat * wgt_mat
X_sum = mul_res.reshape(-1, n, n).sum(axis=0)
return X_sum
List Comprehension with np.matmul
def test_matmul_sum():
random_state = np.random.RandomState(1)
n = 400
p = 800
X = random_state.rand(p, n)
X_sum = np.zeros((n, n))
# The length of weights are not related to X's dims,
# but will always be smaller
y = 750
weights = random_state.rand(y)
# Use list comprehension and np.matmul
matrices_list = [np.matmul(X.T[:, k + 1:],
X[:p - (k + 1), :]) * weights[k] for k in range(y)]
# Sum matrices in list of matrices to get the final result
X_sum = np.sum(matrices_list, axis=0)
return X_sum
import time
start_time = time.time()
res_for = test_for_sum()
print("SUM: {} seconds ---".format(time.time() - start_time))
start_time = time.time()
res_list = test_list_sum()
print("LIST: {} seconds ---".format(time.time() - start_time))
start_time = time.time()
res_gen = test_gen_sum()
print("GEN: {} seconds ---".format(time.time() - start_time))
start_time = time.time()
res_reduce= test_reduce_sum()
print("REDUCE: {} seconds ---".format(time.time() - start_time))
start_time = time.time()
res_concat = test_concat_sum()
print("CONCAT: {} seconds ---".format(time.time() - start_time))
start_time = time.time()
res_matmul = test_matmul_sum()
print("MATMUL: {} seconds ---".format(time.time() - start_time))
Equality Tests
print(np.array_equal(res_for, res_list))
# True
print(np.array_equal(res_for, res_gen))
# True
print(np.array_equal(res_for, res_reduce))
# True
print(np.array_equal(res_for, res_concat))
# True
print(np.array_equal(res_for, res_matmul))
# True
First Run
# SUM: 21.569773197174072 seconds ---
# LIST: 23.576102018356323 seconds ---
# GEN: 21.385253429412842 seconds ---
# REDUCE: 21.426464080810547 seconds ---
# CONCAT: 21.059731483459473 seconds ---
# MATMUL: 23.57494807243347 seconds ---
Second Run
# SUM: 21.6339168548584 seconds ---
# LIST: 19.767740488052368 seconds ---
# GEN: 23.86947798728943 seconds ---
# REDUCE: 19.880712032318115 seconds ---
# CONCAT: 20.761067152023315 seconds ---
# MATMUL: 23.55513620376587 seconds ---
Third Run
# SUM: 22.764745473861694 seconds ---
# LIST: 19.953850984573364 seconds ---
# GEN: 24.37714171409607 seconds ---
# REDUCE: 22.54508638381958 seconds ---
# CONCAT: 21.20585823059082 seconds ---
# MATMUL: 22.303589820861816 seconds ---
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With