I have a time series A. I want to generate another time series B, such that
B[i] = j, where j is the first index greater than i such that A[j] > A[i].
is there a fast way of doing this in numpy?
Thanks.
[EDITED]: preferably use only O(n) of space.
Insufficiently tested, use at own risk.
import numpy
a = numpy.random.random(100)
# a_by_a[i,j] = a[i] > a[j]
a_by_a = a[numpy.newaxis,:] > a[:,numpy.newaxis]
# by taking the upper triangular, we ignore all cases where i < j
a_by_a = numpy.triu(a_by_a)
# argmax will give the first index with the highest value (1 in this case)
print numpy.argmax(a_by_a, axis = 1)
Lower memory version:
a = numpy.random.random(100)
@numpy.vectorize
def values(i):
try:
return (a[i:] > a[i]).nonzero()[0][0] + i
except IndexError:
return -1 # no valid values found
b = values(numpy.arange(100))
Faster version:
@np.vectorize
def values(i):
try:
return next(idx for idx, value in enumerate(A[i+1:]) if value > A[i]) + i + 1
except StopIteration:
return -1 # no valid values found
return values(np.arange(len(A)))
Even faster version:
def future6(A):
# list of tuples (index into A, value in A)
# invariant: indexes and values in sorted order
known = []
result = []
for idx in xrange(len(A) - 1, -1, -1):
value = A[idx]
# since known is sorted a binary search could be applied here
# I haven't bothered
# anything lower then the current value
# cannot possibly be used again, since this value will be first index instead
# of those
known = [(x,y) for x,y in known if y > value]
if known:
# all values in known are > current value
# they reverse sorted by index
# the value with the lowest index is first
result.append(known[-1][0])
else:
# no values exist this high, report -1
result.append(-1)
# add to end of the list to maintain invariant
known.append( (idx, value) )
# let numpy worry about reversing the array
return np.array(result)[::-1]
Credit to cyborg for some of the ideas used here.
Algorithm Difference
cyborg showed significant differences between the various algorithms depending on the data fed into them. I gathered some stats from the running algorithms to see if I could figure out what is going on.
Random data:
Average distance between value and its target: 9
Average length of ascends list: 24
Average length of segment in ascends list: 1.33
Average length of known list: 9.1
Since the lists are really short, the ascents algorithm mostly decays to a linear search. It does clear out ascents which cannot be used in the future, so its still noticably better then a linear search.
Oscillating:
Average distance between value and its target: 31.46
Average length of ascends list: 84
Average length of segment in ascends list: 1.70
Average length of known list: 57.98
The oscillations tend to put the different pieces further apart. This naturally hampers the linear search algorithm. Both of the "smarter" algorithms have to keep track of additional data. My algorithm decays considerably since it scans over the data each time. The ascends algorithm touch less of its data and does better.
Ascending:
Average distance between value and its target: 2.57
Average length of ascends list: 40
Average length of segment in ascends list: 3.27
Average length of known list: 3037.97
Its clear why my algorithm has issues, it has to track a huge number of ascending values. The short distance between the value and target explains the good performance of a linear search. Ascends is still not working with very long segments.
Better algorithms
There is no reason for my algorithm to have to a linear search over the data. It is sorted and we just need remove to small values from the end of the list.
def future6(A):
# list of tuples (index into A, value in A)
# invariant: indexes and values in sorted order
known = []
result = []
for idx in xrange(len(A) - 1, -1, -1):
value = A[idx]
# since known is sorted a binary search could be applied here
# I haven't bothered
# anything lower then the current value
# cannot possibly be used again, since this value will be first index instead
# of those
while known and known[-1][1] < value:
known.pop()
if known:
# all values in known are > current value
# they reverse sorted by index
# the value with the lowest index is first
result.append(known[-1][0])
else:
# no values exist this high, report -1
result.append(-1)
# add to end of the list to maintain invariant
known.append( (idx, value) )
# let numpy worry about reversing the array
return np.array(result)[::-1]
But it occurs to me that we can reuse previous calculated values of B instead of constructing new data structures. If j > i, A[i] > A[j] then B[i] > B[j].
def future8(A):
B = [-1] * len(A)
for index in xrange(len(A)-2, -1, -1):
target = index + 1
value = A[index]
while target != -1 and A[target] < value:
target = B[target]
B[index] = target
return np.array(B)
My benchmark results:
Random series:
future2 ascends : 0.242569923401
future6 full list: 0.0363488197327
future7 vectorize: 0.129994153976
future8 reuse: 0.0299410820007
Oscillating series:
future2 ascends : 0.233623981476
future6 full list: 0.0360488891602
future7 vectorize: 1.19140791893
future8 reuse: 0.0297570228577
Ascending trend series:
future2 ascends : 0.120707035065
future6 full list: 0.0314049720764
future7 vectorize: 0.0640320777893
future8 reuse: 0.0246520042419
Ascending Sections
Cyborg had the very interesting idea to take advantage of ascending segments. I don't think any of his test cases really exhibit the behaviour he was after there. I don't think the sections were long enough to take advantage of. But I figure real data may well have such sections so taking advantage of it would be really helpful.
But I don't think it'll work. It takes O(n) time to prepare the necessary data to do the binary search. That would be fine if we do the binary search many times, but once we find a value in the middle of the ascending section, we never revisit anything to the left. As a result, even with a binary search we spend at most O(n) time processing the data.
It could work if its less expensive to build the neccesary data then to do scan over the ascending section later. But the scan is pretty cheap and you'll be hard pressed to come up with a way of handling ascending sections that's less expensive.
@Winston Ewert 's future8
method is O(n) (!), and is better than all of our previous proposals here. To prove it's O(n), observe that the inner while
loop is executed at most once for any value of B[target]
.
My old answer:
Here is a benchmark of three approaches (after a ping pong between @Winston Ewert and me):
Each of these is significantly faster than the others under different conditions. If the series is random, then "full list" (future6) is fastest. If the series oscillates, then "ascending list" (future2) is fastest. If the series tends to ascend, then vectorize (future7) is fastest.
If the data is stock quotes, I would go with "vectorize" (future7), because stocks have an ascending trend, and because it's simple and performs reasonably under all conditions.
Output:
Random series:
future2 ascends : 0.210215095646
future6 full list: 0.0920153693145
future7 vectorize: 0.138747922771
Oscillating series:
future2 ascends : 0.208349650159
future6 full list: 0.940276050999
future7 vectorize: 0.597290143496
Ascending trend series:
future2 ascends : 0.131106233627
future6 full list: 20.7201531342
future7 vectorize: 0.0540951244451
Code:
import numpy as np
import time
import timeit
def future2(A):
def reverse_enum(L):
for index in reversed(xrange(len(L))):
yield len(L)-index-1, L[index]
def findnext(x, A, ascends): # find index of first future number greater than x
for idx, segment in reverse_enum(ascends):
joff=A[segment[0]:segment[1]+1].searchsorted(x,side='right') # binary search
if joff < (segment[1]-segment[0]+1):
j=segment[0]+joff
[ascends.pop() for _ in range(idx)] # delete previous segments
segment[0]=j # cut beginning of segment
return j
return -1
B = np.arange(len(A))+1
# Note: B[i]=-1 where there is no greater value in the future.
B[-1] = -1 # put -1 at the end
ascends = [] # a list of pairs of indexes, ascending segments of A
maximum = True
for i in xrange(len(A)-2,-1,-1): # scan backwards
#print(ascends)
if A[i] < A[i+1]:
if maximum:
ascends.append([i+1,i+1])
maximum = False
else:
ascends[-1][0] = i+1
else:# A[i] >= A[i+1]
B[i] = findnext(A[i], A, ascends)
maximum = True
return B
def future6(A):
# list of tuples (index into A, value in A)
# invariant: indexes and values in sorted order
known = []
result = []
for idx in xrange(len(A) - 1, -1, -1):
value = A[idx]
# since known is sorted a binary search could be applied here
# I haven't bothered
# anything lower then the current value
# cannot possibly be used again, since this value will be first index instead
# of those
known = [(x,y) for x,y in known if y > value]
if known:
# all values in known are > current value
# they reverse sorted by index
# the value with the lowest index is first
result.append(known[-1][0])
else:
# no values exist this high, report -1
result.append(-1)
# add to end of the list to maintain invariant
known.append( (idx, value) )
# let numpy worry about reversing the array
return np.array(result)[::-1]
def future7(A):
@np.vectorize
def values(i):
for idx, v in enumerate(A[i+1:]): # loop is faster than genexp with exception
if A[i]<v:
return idx+i+1
return -1
return values(np.arange(len(A)))
if __name__ == '__main__':
print('Random series:')
tsetup = """import future; import numpy; A = numpy.random.random(1e4)"""
t = timeit.timeit('future.future2(A)', tsetup, number=3)
print('future2 ascends : '+str(t))
t = timeit.timeit('future.future6(A)', tsetup, number=3)
print('future6 full list: '+str(t))
t = timeit.timeit('future.future7(A)', tsetup, number=3)
print('future7 vectorize: '+str(t))
print('Oscillating series:')
tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-5e4; A = A.cumsum()"""
t = timeit.timeit('future.future2(A)', tsetup, number=3)
print('future2 ascends : '+str(t))
t = timeit.timeit('future.future6(A)', tsetup, number=3)
print('future6 full list: '+str(t))
t = timeit.timeit('future.future7(A)', tsetup, number=3)
print('future7 vectorize: '+str(t))
print('Ascending trend series:')
tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-3e4; A = A.cumsum()"""
t = timeit.timeit('future.future2(A)', tsetup, number=3)
print('future2 ascends : '+str(t))
t = timeit.timeit('future.future6(A)', tsetup, number=3)
print('future6 full list: '+str(t))
t = timeit.timeit('future.future7(A)', tsetup, number=3)
print('future7 vectorize: '+str(t))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With