I have two sets of points in 2D A
and B
and I need to find the minimum distance for each point in A
, to a point in B
. So far I've been using SciPy's cdist with the code below
import numpy as np
from scipy.spatial.distance import cdist
def ABdist(A, B):
# Distance to all points in B, for each point in A.
dist = cdist(A, B, 'euclidean')
# Indexes to minimum distances.
min_dist_idx = np.argmin(dist, axis=1)
# Store only the minimum distances for each point in A, to a point in B.
min_dists = [dist[i][md_idx] for i, md_idx in enumerate(min_dist_idx)]
return min_dist_idx, min_dists
N = 10000
A = np.random.uniform(0., 5000., (N, 2))
B = np.random.uniform(0., 5000., (N, 2))
min_dist_idx, min_dists = ABdist(A, B)
which works just fine for smallish values of N
. But now the lengths of the sets have increased from N=10000
to N=35000
and I'm running into a
dm = np.zeros((mA, mB), dtype=np.double)
MemoryError
I know I can replace cdist
with a for loop that keeps only the minimum distance (and the index) for each point in A
to each point in B
, as that is all I need. I don't need the full AxB
distance matrix. But I've been using cdist
precisely because it is fast.
Is there a way to replace cdist
with an implementation that is (almost?) as fast, but that does not take that much memory?
The shortest distance between two points is a straight line. This distance can be calculated by using the distance formula. The distance between two points ( x 1 , y 1 ) and ( x 2 , y 2 ) can be defined as d = ( x 2 − x 1 ) 2 + ( y 2 − y 1 ) 2 .
What is the Shortest Distance Between Two Points? The shortest distance between two points can be calculated by finding the length of the straight line connecting both the points.
Use dist in a nested loop inside shortestDist to compare each element of the list of points with every element in the list after it. So, basically, find the shortest distance between points in a list. That finds the distance alright between two points.
The best approach will involve using a data structure specially designed for nearest neighbor search, such as a k-d tree. For example, SciPy's cKDTree allows you to solve the problem this way:
from scipy.spatial import cKDTree
min_dists, min_dist_idx = cKDTree(B).query(A, 1)
The result is much more efficient than any approach based on broadcasting, both in terms of computation and memory use.
For example, even with 1,000,000 points, the computation does not run out of memory, and takes only a few seconds on my laptop:
N = 1000000
A = np.random.uniform(0., 5000., (N, 2))
B = np.random.uniform(0., 5000., (N, 2))
%timeit cKDTree(B).query(A, 1)
# 3.25 s ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
The trick is to maximize compute versus memory ratio here. The output is of length N
, one index and distance for each pt in A
. We could reduce it to one loop with one output element per iteration and this will process through all B
pts per iteration, which is bringing in the high compute ratio.
Thus, leveraging einsum
and matrix-multiplication
inspired from this post
, for each point pt
in A
, we would get the squared euclidean distances, like so -
for pt in A:
d = np.einsum('ij,ij->i',B,B) + pt.dot(pt) - 2*B.dot(pt)
Thus, generalizing it cover all points in A
and pre-computing np.einsum('ij,ij->i',B,B)
, we would have an implementation like so -
min_idx = np.empty(N, dtype=int)
min_dist = np.empty(N)
Bsqsum = np.einsum('ij,ij->i',B,B)
for i,pt in enumerate(A):
d = Bsqsum + pt.dot(pt) - 2*B.dot(pt)
min_idx[i] = d.argmin()
min_dist[i] = d[min_idx[i]]
min_dist = np.sqrt(min_dist)
Now, a fully vectorized solution would be -
np.einsum('ij,ij->i',B,B)[:,None] + np.einsum('ij,ij->i',A,A) - 2*B.dot(A.T)
So, to work in chunks, we would slice out rows off A
and to do so would be easier to simply reshape to 3D
, like so -
chunk_size= 100 # Edit this as per memory setup available
# More means more memory needed
A.shape = (A.shape[0]//chunk_size, chunk_size,-1)
min_idx = np.empty((N//chunk_size, chunk_size), dtype=int)
min_dist = np.empty((N//chunk_size, chunk_size))
Bsqsum = np.einsum('ij,ij->i',B,B)[:,None]
r = np.arange(chunk_size)
for i,chnk in enumerate(A):
d = Bsqsum + np.einsum('ij,ij->i',chnk,chnk) - 2*B.dot(chnk.T)
idx = d.argmin(0)
min_idx[i] = idx
min_dist[i] = d[idx,r]
min_dist = np.sqrt(min_dist)
min_idx.shape = (N,)
min_dist.shape = (N,)
A.shape = (N,-1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With