Numpy 向量化算法查找第一个大于当前元素的未来元素

发布于 2024-12-12 11:02:35 字数 168 浏览 2 评论 0原文

我有一个时间序列 A。我想生成另一个时间序列 B,使得

B[i] = j,其中 j 是第一个大于 i 的索引,使得 A[j] > 。人工智能]。

有没有一种快速的方法可以在 numpy 中做到这一点?

谢谢。

[编辑]:最好仅使用 O(n) 的空间。

I have a time series A. I want to generate another time series B, such that

B[i] = j, where j is the first index greater than i such that A[j] > A[i].

is there a fast way of doing this in numpy?

Thanks.

[EDITED]: preferably use only O(n) of space.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

剪不断理还乱 2024-12-19 11:02:35

未经充分测试,使用风险自负。

import numpy

a = numpy.random.random(100)

# a_by_a[i,j] = a[i] > a[j]
a_by_a = a[numpy.newaxis,:] > a[:,numpy.newaxis]
# by taking the upper triangular, we ignore all cases where i < j
a_by_a = numpy.triu(a_by_a)
# argmax will give the first index with the highest value (1 in this case)
print numpy.argmax(a_by_a, axis = 1)

内存较低的版本:

a = numpy.random.random(100)

@numpy.vectorize
def values(i):
    try:
        return (a[i:] > a[i]).nonzero()[0][0] + i
    except IndexError:
        return -1 # no valid values found

b = values(numpy.arange(100))

更快的版本:

@np.vectorize
def values(i):
    try:
        return next(idx for idx, value in enumerate(A[i+1:]) if value > A[i]) + i + 1
    except StopIteration:
        return -1 # no valid values found
return values(np.arange(len(A)))

甚至更快的版本:

def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        known = [(x,y) for x,y in known if y > value]


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]

这里使用的一些想法归功于 cyborg。

算法差异

cyborg 根据输入的数据显示出各种算法之间的显着差异。我从运行的算法中收集了一些统计数据,看看我是否能弄清楚发生了什么。

随机数据:

Average distance between value and its target: 9
Average length of ascends list: 24
Average length of segment in ascends list: 1.33
Average length of known list: 9.1

由于列表非常短,因此上升算法大多会退化为线性搜索。它确实清除了将来无法使用的上升,因此它仍然明显优于线性搜索。

振荡:

Average distance between value and its target: 31.46
Average length of ascends list: 84
Average length of segment in ascends list: 1.70
Average length of known list: 57.98

振荡往往会使不同的部分分开得更远。这自然会阻碍线性搜索算法。两种“更智能”的算法都必须跟踪附加数据。我的算法会大幅衰减,因为它每次都会扫描数据。 ascends 算法接触的数据更少,效果更好。

升序:

Average distance between value and its target: 2.57
Average length of ascends list: 40
Average length of segment in ascends list: 3.27
Average length of known list: 3037.97

很明显我的算法存在问题,它必须跟踪大量升序值。值和目标之间的距离较短解释了线性搜索的良好性能。 Ascends 仍然无法处理很长的段。

更好的算法

我的算法没有理由必须对数据进行线性搜索。它已排序,我们只需从列表末尾删除较小的值即可。

def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        while known and known[-1][1] < value:
            known.pop()


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]

但我突然想到,我们可以重用之前计算出的 B 值,而不是构建新的数据结构。如果j> i,A[i]> A[j]则B[i]> B[j]。

def future8(A):
    B = [-1] * len(A)
    for index in xrange(len(A)-2, -1, -1):
        target = index + 1
        value = A[index]
        while target != -1 and A[target] < value:
            target = B[target]
        B[index] = target
    return np.array(B)

我的基准测试结果:

Random series:
future2 ascends  : 0.242569923401
future6 full list: 0.0363488197327
future7 vectorize: 0.129994153976
future8 reuse: 0.0299410820007
Oscillating series:
future2 ascends  : 0.233623981476
future6 full list: 0.0360488891602
future7 vectorize: 1.19140791893
future8 reuse: 0.0297570228577
Ascending trend series:
future2 ascends  : 0.120707035065
future6 full list: 0.0314049720764
future7 vectorize: 0.0640320777893
future8 reuse: 0.0246520042419

上升部分

Cyborg 有一个非常有趣的想法来利用上升部分。我认为他的任何测试用例都没有真正表现出他所追求的行为。我认为这些部分不够长,无法利用。但我认为真实的数据很可能有这样的部分,所以利用它会非常有帮助。

但我认为这不会起作用。准备进行二分查找所需的数据需要 O(n) 时间。如果我们多次进行二分搜索,那就没问题,但是一旦我们在升序部分的中间找到一个值,我们就永远不会重新访问左侧的任何内容。因此,即使使用二分搜索,我们也最多花费 O(n) 时间处理数据。

如果构建必要的数据比稍后扫描升序部分更便宜,那么它就可以工作。但扫描非常便宜,您将很难想出一种更便宜的处理上升部分的方法。

Insufficiently tested, use at own risk.

import numpy

a = numpy.random.random(100)

# a_by_a[i,j] = a[i] > a[j]
a_by_a = a[numpy.newaxis,:] > a[:,numpy.newaxis]
# by taking the upper triangular, we ignore all cases where i < j
a_by_a = numpy.triu(a_by_a)
# argmax will give the first index with the highest value (1 in this case)
print numpy.argmax(a_by_a, axis = 1)

Lower memory version:

a = numpy.random.random(100)

@numpy.vectorize
def values(i):
    try:
        return (a[i:] > a[i]).nonzero()[0][0] + i
    except IndexError:
        return -1 # no valid values found

b = values(numpy.arange(100))

Faster version:

@np.vectorize
def values(i):
    try:
        return next(idx for idx, value in enumerate(A[i+1:]) if value > A[i]) + i + 1
    except StopIteration:
        return -1 # no valid values found
return values(np.arange(len(A)))

Even faster version:

def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        known = [(x,y) for x,y in known if y > value]


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]

Credit to cyborg for some of the ideas used here.

Algorithm Difference

cyborg showed significant differences between the various algorithms depending on the data fed into them. I gathered some stats from the running algorithms to see if I could figure out what is going on.

Random data:

Average distance between value and its target: 9
Average length of ascends list: 24
Average length of segment in ascends list: 1.33
Average length of known list: 9.1

Since the lists are really short, the ascents algorithm mostly decays to a linear search. It does clear out ascents which cannot be used in the future, so its still noticably better then a linear search.

Oscillating:

Average distance between value and its target: 31.46
Average length of ascends list: 84
Average length of segment in ascends list: 1.70
Average length of known list: 57.98

The oscillations tend to put the different pieces further apart. This naturally hampers the linear search algorithm. Both of the "smarter" algorithms have to keep track of additional data. My algorithm decays considerably since it scans over the data each time. The ascends algorithm touch less of its data and does better.

Ascending:

Average distance between value and its target: 2.57
Average length of ascends list: 40
Average length of segment in ascends list: 3.27
Average length of known list: 3037.97

Its clear why my algorithm has issues, it has to track a huge number of ascending values. The short distance between the value and target explains the good performance of a linear search. Ascends is still not working with very long segments.

Better algorithms

There is no reason for my algorithm to have to a linear search over the data. It is sorted and we just need remove to small values from the end of the list.

def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        while known and known[-1][1] < value:
            known.pop()


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]

But it occurs to me that we can reuse previous calculated values of B instead of constructing new data structures. If j > i, A[i] > A[j] then B[i] > B[j].

def future8(A):
    B = [-1] * len(A)
    for index in xrange(len(A)-2, -1, -1):
        target = index + 1
        value = A[index]
        while target != -1 and A[target] < value:
            target = B[target]
        B[index] = target
    return np.array(B)

My benchmark results:

Random series:
future2 ascends  : 0.242569923401
future6 full list: 0.0363488197327
future7 vectorize: 0.129994153976
future8 reuse: 0.0299410820007
Oscillating series:
future2 ascends  : 0.233623981476
future6 full list: 0.0360488891602
future7 vectorize: 1.19140791893
future8 reuse: 0.0297570228577
Ascending trend series:
future2 ascends  : 0.120707035065
future6 full list: 0.0314049720764
future7 vectorize: 0.0640320777893
future8 reuse: 0.0246520042419

Ascending Sections

Cyborg had the very interesting idea to take advantage of ascending segments. I don't think any of his test cases really exhibit the behaviour he was after there. I don't think the sections were long enough to take advantage of. But I figure real data may well have such sections so taking advantage of it would be really helpful.

But I don't think it'll work. It takes O(n) time to prepare the necessary data to do the binary search. That would be fine if we do the binary search many times, but once we find a value in the middle of the ascending section, we never revisit anything to the left. As a result, even with a binary search we spend at most O(n) time processing the data.

It could work if its less expensive to build the neccesary data then to do scan over the ascending section later. But the scan is pretty cheap and you'll be hard pressed to come up with a way of handling ascending sections that's less expensive.

不知所踪 2024-12-19 11:02:35

@Winston Ewert 的 future8 方法是 O(n) (!),并且比我们之前的所有建议都要好。为了证明它的复杂度为 O(n),请观察对于任何 B[target] 值,内部 while 循环最多执行一次。

我的旧答案:

这是三种方法的基准(在@Winston Ewert 和我之间进行乒乓球比赛之后):

  1. 使用二分搜索的升序列表。 (future2)
  2. 完整列表 (future6, by @Winston Ewert)
  3. numpy.vectorize (future7, @Winston Ewert future5 的增强版本)。

在不同条件下,其中每一个都比其他要快得多。如果系列是随机的,那么“完整列表”(future6)是最快的。如果系列振荡,那么“升序列表”(future2)是最快的。如果级数趋于上升,则向量化(future7)是最快的。

如果数据是股票报价,我会选择“向量化”(future7),因为股票有上升趋势,而且它很简单并且在所有条件下都表现合理。

输出:

Random series:
future2 ascends  : 0.210215095646
future6 full list: 0.0920153693145
future7 vectorize: 0.138747922771
Oscillating series:
future2 ascends  : 0.208349650159
future6 full list: 0.940276050999
future7 vectorize: 0.597290143496
Ascending trend series:
future2 ascends  : 0.131106233627
future6 full list: 20.7201531342
future7 vectorize: 0.0540951244451

代码:

import numpy as np
import time 
import timeit

def future2(A):    
    def reverse_enum(L):
        for index in reversed(xrange(len(L))):
            yield len(L)-index-1, L[index]
    def findnext(x, A, ascends): # find index of first future number greater than x
        for idx, segment in reverse_enum(ascends):
            joff=A[segment[0]:segment[1]+1].searchsorted(x,side='right') # binary search
            if joff < (segment[1]-segment[0]+1):
                j=segment[0]+joff
                [ascends.pop() for _ in range(idx)] # delete previous segments
                segment[0]=j # cut beginning of segment 
                return j
        return -1
    B = np.arange(len(A))+1
    # Note: B[i]=-1 where there is no greater value in the future.
    B[-1] = -1 # put -1 at the end
    ascends = [] # a list of pairs of indexes, ascending segments of A
    maximum = True
    for i in xrange(len(A)-2,-1,-1): # scan backwards
        #print(ascends)
        if A[i] < A[i+1]:
            if maximum:
                ascends.append([i+1,i+1])
                maximum = False
            else:
                ascends[-1][0] = i+1
        else:# A[i] >= A[i+1]
            B[i] = findnext(A[i], A, ascends)
            maximum = True
    return B


def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        known = [(x,y) for x,y in known if y > value]


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]


def future7(A):
    @np.vectorize
    def values(i):
        for idx, v in enumerate(A[i+1:]): # loop is faster than genexp with exception
            if A[i]<v:
                return idx+i+1
        return -1
    return values(np.arange(len(A)))

if __name__ == '__main__':
    print('Random series:')
    tsetup = """import future; import numpy; A = numpy.random.random(1e4)"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))

    print('Oscillating series:')
    tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-5e4; A = A.cumsum()"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))

    print('Ascending trend series:')
    tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-3e4; A = A.cumsum()"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))

@Winston Ewert 's future8 method is O(n) (!), and is better than all of our previous proposals here. To prove it's O(n), observe that the inner while loop is executed at most once for any value of B[target].

My old answer:

Here is a benchmark of three approaches (after a ping pong between @Winston Ewert and me):

  1. Ascending list with binary search. (future2)
  2. Full list (future6, by @Winston Ewert)
  3. numpy.vectorize (future7, an enhanced version of @Winston Ewert 's future5).

Each of these is significantly faster than the others under different conditions. If the series is random, then "full list" (future6) is fastest. If the series oscillates, then "ascending list" (future2) is fastest. If the series tends to ascend, then vectorize (future7) is fastest.

If the data is stock quotes, I would go with "vectorize" (future7), because stocks have an ascending trend, and because it's simple and performs reasonably under all conditions.

Output:

Random series:
future2 ascends  : 0.210215095646
future6 full list: 0.0920153693145
future7 vectorize: 0.138747922771
Oscillating series:
future2 ascends  : 0.208349650159
future6 full list: 0.940276050999
future7 vectorize: 0.597290143496
Ascending trend series:
future2 ascends  : 0.131106233627
future6 full list: 20.7201531342
future7 vectorize: 0.0540951244451

Code:

import numpy as np
import time 
import timeit

def future2(A):    
    def reverse_enum(L):
        for index in reversed(xrange(len(L))):
            yield len(L)-index-1, L[index]
    def findnext(x, A, ascends): # find index of first future number greater than x
        for idx, segment in reverse_enum(ascends):
            joff=A[segment[0]:segment[1]+1].searchsorted(x,side='right') # binary search
            if joff < (segment[1]-segment[0]+1):
                j=segment[0]+joff
                [ascends.pop() for _ in range(idx)] # delete previous segments
                segment[0]=j # cut beginning of segment 
                return j
        return -1
    B = np.arange(len(A))+1
    # Note: B[i]=-1 where there is no greater value in the future.
    B[-1] = -1 # put -1 at the end
    ascends = [] # a list of pairs of indexes, ascending segments of A
    maximum = True
    for i in xrange(len(A)-2,-1,-1): # scan backwards
        #print(ascends)
        if A[i] < A[i+1]:
            if maximum:
                ascends.append([i+1,i+1])
                maximum = False
            else:
                ascends[-1][0] = i+1
        else:# A[i] >= A[i+1]
            B[i] = findnext(A[i], A, ascends)
            maximum = True
    return B


def future6(A):
    # list of tuples (index into A, value in A)
    # invariant: indexes and values in sorted order
    known = []
    result = []
    for idx in xrange(len(A) - 1, -1, -1):
        value = A[idx]
        # since known is sorted a binary search could be applied here
        # I haven't bothered

        # anything lower then the current value
        # cannot possibly be used again, since this value will be first index instead
        # of those
        known = [(x,y) for x,y in known if y > value]


        if known: 
            # all values in known are > current value
            # they reverse sorted by index               
            # the value with the lowest index is first
            result.append(known[-1][0])
        else:
            # no values exist this high, report -1
            result.append(-1)
        # add to end of the list to maintain invariant
        known.append( (idx, value) )

    # let numpy worry about reversing the array
    return np.array(result)[::-1]


def future7(A):
    @np.vectorize
    def values(i):
        for idx, v in enumerate(A[i+1:]): # loop is faster than genexp with exception
            if A[i]<v:
                return idx+i+1
        return -1
    return values(np.arange(len(A)))

if __name__ == '__main__':
    print('Random series:')
    tsetup = """import future; import numpy; A = numpy.random.random(1e4)"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))

    print('Oscillating series:')
    tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-5e4; A = A.cumsum()"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))

    print('Ascending trend series:')
    tsetup = """import future; import numpy; A = numpy.random.randint(1e5,size=1e4)-3e4; A = A.cumsum()"""
    t = timeit.timeit('future.future2(A)', tsetup, number=3)
    print('future2 ascends  : '+str(t))
    t = timeit.timeit('future.future6(A)', tsetup, number=3)
    print('future6 full list: '+str(t))
    t = timeit.timeit('future.future7(A)', tsetup, number=3)
    print('future7 vectorize: '+str(t))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文