I would like to find out what is the most efficient way to achieve the following in Python:
Suppose we have two lists a
and b
which are of equal length and contain up to 1e7 elements.
However, for the ease of illustration we may consider the following:
a = [2, 1, 2, 3, 4, 5, 4, 6, 5, 7, 8, 9, 8,10,11]
b = [1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]
The goal is to create a strictly monotonic list a_new
from a
whereas only the first sample point of sample points with identical values is used.
The same indices that have to be deleted in a
should also be deleted in b
such that the final result will be:
a_new = [2, 3, 4, 5, 6, 7, 8, 9,10,11]
b_new = [1, 4, 5, 6, 8,10,11,12,14,15]
Of course this can be done using computationally expensive for
loops which is however not suitable due to the huge amount of data.
Any suggestions are very appreciated.
You can calculate the cumulative max of a
and then drop duplicates with np.unique
with which you can also record the unique index so as to subset b
correspondingly:
a = np.array([2,1,2,3,4,5,4,6,5,7,8,9,8,10,11])
b = np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])
a_cummax = np.maximum.accumulate(a)
a_new, idx = np.unique(a_cummax, return_index=True)
a_new
# array([ 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
b[idx]
# array([ 1, 4, 5, 6, 8, 10, 11, 12, 14, 15])
Running a version of @juanpa.arrivillaga's function with numba
import numba
def psi(A):
a_cummax = np.maximum.accumulate(A)
a_new, idx = np.unique(a_cummax, return_index=True)
return idx
def foo(arr):
aux=np.maximum.accumulate(arr)
flag = np.concatenate(([True], aux[1:] != aux[:-1]))
return np.nonzero(flag)[0]
@numba.jit
def f(A):
m = A[0]
a_new, idx = [m], [0]
for i, a in enumerate(A[1:], 1):
if a > m:
m = a
a_new.append(a)
idx.append(i)
return idx
timing
%timeit f(a)
The slowest run took 5.37 times longer than the fastest. This could mean that an intermediate result is being cached.
1000000 loops, best of 3: 1.83 µs per loop
%timeit foo(a)
The slowest run took 9.41 times longer than the fastest. This could mean that an intermediate result is being cached.
100000 loops, best of 3: 6.35 µs per loop
%timeit psi(a)
The slowest run took 9.66 times longer than the fastest. This could mean that an intermediate result is being cached.
100000 loops, best of 3: 9.95 µs per loop
unique
with return_index
uses argsort
. With maximum.accumulate
that isn't needed. So we can cannibalize unique
and do:
In [313]: a = [2,1,2,3,4,5,4,6,5,7,8,9,8,10,11]
In [314]: arr = np.array(a)
In [315]: aux = np.maximum.accumulate(arr)
In [316]: flag = np.concatenate(([True], aux[1:] != aux[:-1])) # key unique step
In [317]: idx = np.nonzero(flag)
In [318]: idx
Out[318]: (array([ 0, 3, 4, 5, 7, 9, 10, 11, 13, 14], dtype=int32),)
In [319]: arr[idx]
Out[319]: array([ 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
In [320]: np.array(b)[idx]
Out[320]: array([ 1, 4, 5, 6, 8, 10, 11, 12, 14, 15])
In [323]: np.unique(aux, return_index=True)[1]
Out[323]: array([ 0, 3, 4, 5, 7, 9, 10, 11, 13, 14], dtype=int32)
def foo(arr):
aux=np.maximum.accumulate(arr)
flag = np.concatenate(([True], aux[1:] != aux[:-1]))
return np.nonzero(flag)[0]
In [330]: timeit foo(arr)
....
100000 loops, best of 3: 12.5 µs per loop
In [331]: timeit np.unique(np.maximum.accumulate(arr), return_index=True)[1]
....
10000 loops, best of 3: 21.5 µs per loop
With (10000,) shape medium
this sort-less unique has a substantial speed advantage:
In [334]: timeit np.unique(np.maximum.accumulate(medium[0]), return_index=True)[1]
1000 loops, best of 3: 351 µs per loop
In [335]: timeit foo(medium[0])
The slowest run took 4.14 times longer ....
10000 loops, best of 3: 48.9 µs per loop
[1]: Use np.source(np.unique)
to see code, or ?? in IPython
Here is a vanilla Python solution that does one pass:
>>> a = [2,1,2,3,4,5,4,6,5,7,8,9,8,10,11]
>>> b = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
>>> a_new, b_new = [], []
>>> last = float('-inf')
>>> for x, y in zip(a, b):
... if x > last:
... last = x
... a_new.append(x)
... b_new.append(y)
...
>>> a_new
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
>>> b_new
[1, 4, 5, 6, 8, 10, 11, 12, 14, 15]
I'm curious to see how it compares to the numpy
solution, which will have similar time complexity but does a couple of passes on the data.
Here are some timings. First, setup:
>>> small = ([2,1,2,3,4,5,4,6,5,7,8,9,8,10,11], [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])
>>> medium = (np.random.randint(1, 10000, (10000,)), np.random.randint(1, 10000, (10000,)))
>>> large = (np.random.randint(1, 10000000, (10000000,)), np.random.randint(1, 10000000, (10000000,)))
And now the two approaches:
>>> def monotonic(a, b):
... a_new, b_new = [], []
... last = float('-inf')
... for x,y in zip(a,b):
... if x > last:
... last = x
... a_new.append(x)
... b_new.append(y)
... return a_new, b_new
...
>>> def np_monotonic(a, b):
... a_new, idx = np.unique(np.maximum.accumulate(a), return_index=True)
... return a_new, b[idx]
...
Note, the approaches are not strictly equivalent, one stays in vanilla Python land, the other stays in numpy
array land. We will compare performance assuming you are starting with the corresponding data structure (either numpy.array
or list
):
So first, a small list, the same from the OP's example, we see that numpy
is not actually faster, which isn't surprising for small data structures:
>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, small; a, b = small", number=10000)
0.039130652003223076
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, small, np; a, b = np.array(small[0]), np.array(small[1])", number=10000)
0.10779813499539159
Now a "medium" list/array of 10,000 elements, we start to see numpy
advantages:
>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, medium; a, b = medium[0].tolist(), medium[1].tolist()", number=10000)
4.642718859016895
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, medium; a, b = medium", number=10000)
1.3776302759943064
Now, interestingly, the advantage seems to narrow with "large" arrays, on the order of 1e7 elements:
>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, large; a, b = large[0].tolist(), large[1].tolist()", number=10)
4.400254560023313
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, large; a, b = large", number=10)
3.593393853981979
Note, in the last pair of timings, I only did them 10 times each, but if someone has a better machine or more patience, please feel free to increase number
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With