查找 numpy 数组中相同值序列的长度(游程长度编码)

发布于 2024-07-25 10:25:46 字数 886 浏览 4 评论 0 原文

在 pylab 程序(也可能是 matlab 程序)中,我有一个代表距离的 numpy 数字数组:d[t] 是时间 的距离 >t (我的数据的时间跨度是 len(d) 时间单位)。

我感兴趣的事件是距离低于某个阈值时的事件,并且我想计算这些事件的持续时间。 使用b = d很容易获得布尔值数组,问题归结为计算b中仅True单词的长度序列。 但我不知道如何有效地做到这一点(即使用 numpy 原语),我求助于遍历数组并进行手动更改检测(即当值从 False 变为 True 时初始化计数器,只要值为 True 就增加计数器,并在值返回到 False 时将计数器输出到序列中)。 但这非常慢。

如何有效地检测 numpy 数组中的这种序列?

下面是一些 python 代码,说明了我的问题:第四个点需要很长时间才能出现(如果没有,请增加数组的大小)

from pylab import *

threshold = 7

print '.'
d = 10*rand(10000000)

print '.'

b = d<threshold

print '.'

durations=[]
for i in xrange(len(b)):
    if b[i] and (i==0 or not b[i-1]):
        counter=1
    if  i>0 and b[i-1] and b[i]:
        counter+=1
    if (b[i-1] and not b[i]) or i==len(b)-1:
        durations.append(counter)

print '.'

In a pylab program (which could probably be a matlab program as well) I have a numpy array of numbers representing distances: d[t] is the distance at time t (and the timespan of my data is len(d) time units).

The events I'm interested in are when the distance is below a certain threshold, and I want to compute the duration of these events. It's easy to get an array of booleans with b = d<threshold, and the problem comes down to computing the sequence of the lengths of the True-only words in b. But I do not know how to do that efficiently (i.e. using numpy primitives), and I resorted to walk the array and to do manual change detection (i.e. initialize counter when value goes from False to True, increase counter as long as value is True, and output the counter to the sequence when value goes back to False). But this is tremendously slow.

How to efficienly detect that sort of sequences in numpy arrays ?

Below is some python code that illustrates my problem : the fourth dot takes a very long time to appear (if not, increase the size of the array)

from pylab import *

threshold = 7

print '.'
d = 10*rand(10000000)

print '.'

b = d<threshold

print '.'

durations=[]
for i in xrange(len(b)):
    if b[i] and (i==0 or not b[i-1]):
        counter=1
    if  i>0 and b[i-1] and b[i]:
        counter+=1
    if (b[i-1] and not b[i]) or i==len(b)-1:
        durations.append(counter)

print '.'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

柠北森屋 2024-08-01 10:25:47

适用于任何数组的完全 numpy 矢量化和通用 RLE(也适用于字符串、布尔值等)。

输出游程长度、起始位置和值的元组。

import numpy as np

def rle(inarray):
        """ run length encoding. Partial credit to R rle function. 
            Multi datatype arrays catered for including non Numpy
            returns: tuple (runlengths, startpositions, values) """
        ia = np.asarray(inarray)                # force numpy
        n = len(ia)
        if n == 0: 
            return (None, None, None)
        else:
            y = ia[1:] != ia[:-1]               # pairwise unequal (string safe)
            i = np.append(np.where(y), n - 1)   # must include last element posi
            z = np.diff(np.append(-1, i))       # run lengths
            p = np.cumsum(np.append(0, z))[:-1] # positions
            return(z, p, ia[i])

相当快(i7):

xx = np.random.randint(0, 5, 1000000)
%timeit yy = rle(xx)
100 loops, best of 3: 18.6 ms per loop

多种数据类型:

rle([True, True, True, False, True, False, False])
Out[8]: 
(array([3, 1, 1, 2]),
 array([0, 3, 4, 5]),
 array([ True, False,  True, False], dtype=bool))

rle(np.array([5, 4, 4, 4, 4, 0, 0]))
Out[9]: (array([1, 4, 2]), array([0, 1, 5]), array([5, 4, 0]))

rle(["hello", "hello", "my", "friend", "okay", "okay", "bye"])
Out[10]: 
(array([2, 1, 1, 2, 1]),
 array([0, 2, 3, 4, 6]),
 array(['hello', 'my', 'friend', 'okay', 'bye'], 
       dtype='|S6'))

与上面的 Alex Martelli 结果相同:

xx = np.random.randint(0, 2, 20)

xx
Out[60]: array([1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1])

am = runs_of_ones_array(xx)

tb = rle(xx)

am
Out[63]: array([4, 5, 2, 5])

tb[0][tb[2] == 1]
Out[64]: array([4, 5, 2, 5])

%timeit runs_of_ones_array(xx)
10000 loops, best of 3: 28.5 µs per loop

%timeit rle(xx)
10000 loops, best of 3: 38.2 µs per loop

比 Alex 稍慢(但仍然非常快),并且更灵活。

Fully numpy vectorized and generic RLE for any array (works with strings, booleans etc too).

Outputs tuple of run lengths, start positions, and values.

import numpy as np

def rle(inarray):
        """ run length encoding. Partial credit to R rle function. 
            Multi datatype arrays catered for including non Numpy
            returns: tuple (runlengths, startpositions, values) """
        ia = np.asarray(inarray)                # force numpy
        n = len(ia)
        if n == 0: 
            return (None, None, None)
        else:
            y = ia[1:] != ia[:-1]               # pairwise unequal (string safe)
            i = np.append(np.where(y), n - 1)   # must include last element posi
            z = np.diff(np.append(-1, i))       # run lengths
            p = np.cumsum(np.append(0, z))[:-1] # positions
            return(z, p, ia[i])

Pretty fast (i7):

xx = np.random.randint(0, 5, 1000000)
%timeit yy = rle(xx)
100 loops, best of 3: 18.6 ms per loop

Multiple data types:

rle([True, True, True, False, True, False, False])
Out[8]: 
(array([3, 1, 1, 2]),
 array([0, 3, 4, 5]),
 array([ True, False,  True, False], dtype=bool))

rle(np.array([5, 4, 4, 4, 4, 0, 0]))
Out[9]: (array([1, 4, 2]), array([0, 1, 5]), array([5, 4, 0]))

rle(["hello", "hello", "my", "friend", "okay", "okay", "bye"])
Out[10]: 
(array([2, 1, 1, 2, 1]),
 array([0, 2, 3, 4, 6]),
 array(['hello', 'my', 'friend', 'okay', 'bye'], 
       dtype='|S6'))

Same results as Alex Martelli above:

xx = np.random.randint(0, 2, 20)

xx
Out[60]: array([1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1])

am = runs_of_ones_array(xx)

tb = rle(xx)

am
Out[63]: array([4, 5, 2, 5])

tb[0][tb[2] == 1]
Out[64]: array([4, 5, 2, 5])

%timeit runs_of_ones_array(xx)
10000 loops, best of 3: 28.5 µs per loop

%timeit rle(xx)
10000 loops, best of 3: 38.2 µs per loop

Slightly slower than Alex (but still very fast), and much more flexible.

木有鱼丸 2024-08-01 10:25:47

虽然不是 numpy 基元,但 itertools 函数通常非常快,因此请尝试一下(当然,并测量包括此在内的各种解决方案的时间):

def runs_of_ones(bits):
  for bit, group in itertools.groupby(bits):
    if bit: yield sum(group)

如果您确实需要列表中的值,当然可以使用 list(runs_of_ones(bits)) ; 但也许列表理解可能会稍微快一些:

def runs_of_ones_list(bits):
  return [sum(g) for b, g in itertools.groupby(bits) if b]

转向“numpy-native”可能性,怎么样:

def runs_of_ones_array(bits):
  # make sure all runs of ones are well-bounded
  bounded = numpy.hstack(([0], bits, [0]))
  # get 1 at run starts and -1 at run ends
  difs = numpy.diff(bounded)
  run_starts, = numpy.where(difs > 0)
  run_ends, = numpy.where(difs < 0)
  return run_ends - run_starts

再次强调:一定要在现实的示例中对彼此的解决方案进行基准测试!

While not numpy primitives, itertools functions are often very fast, so do give this one a try (and measure times for various solutions including this one, of course):

def runs_of_ones(bits):
  for bit, group in itertools.groupby(bits):
    if bit: yield sum(group)

If you do need the values in a list, just can use list(runs_of_ones(bits)), of course; but maybe a list comprehension might be marginally faster still:

def runs_of_ones_list(bits):
  return [sum(g) for b, g in itertools.groupby(bits) if b]

Moving to "numpy-native" possibilities, what about:

def runs_of_ones_array(bits):
  # make sure all runs of ones are well-bounded
  bounded = numpy.hstack(([0], bits, [0]))
  # get 1 at run starts and -1 at run ends
  difs = numpy.diff(bounded)
  run_starts, = numpy.where(difs > 0)
  run_ends, = numpy.where(difs < 0)
  return run_ends - run_starts

Again: be sure to benchmark solutions against each others in realistic-for-you examples!

红ご颜醉 2024-08-01 10:25:47

这是一个仅使用数组的解决方案:它采用一个包含布尔序列的数组并计算转换的长度。

>>> from numpy import array, arange
>>> b = array([0,0,0,1,1,1,0,0,0,1,1,1,1,0,0], dtype=bool)
>>> sw = (b[:-1] ^ b[1:]); print sw
[False False  True False False  True False False  True False False False
  True False]
>>> isw = arange(len(sw))[sw]; print isw
[ 2  5  8 12]
>>> lens = isw[1::2] - isw[::2]; print lens
[3 4]

sw 在有开关的地方包含 true,isw 将它们转换为索引。 然后,isw 的项在lens 中成对相减。

请注意,如果序列以 1 开头,它将计算 0 序列的长度:这可以在索引中修复以计算镜头。 另外,我还没有测试极端情况,例如长度为 1 的序列。


返回所有 True 子数组的起始位置和长度的完整函数。

import numpy as np

def count_adjacent_true(arr):
    assert len(arr.shape) == 1
    assert arr.dtype == np.bool
    if arr.size == 0:
        return np.empty(0, dtype=int), np.empty(0, dtype=int)
    sw = np.insert(arr[1:] ^ arr[:-1], [0, arr.shape[0]-1], values=True)
    swi = np.arange(sw.shape[0])[sw]
    offset = 0 if arr[0] else 1
    lengths = swi[offset+1::2] - swi[offset:-1:2]
    return swi[offset:-1:2], lengths

测试不同的布尔一维数组(空数组;单个/多个元素;偶数/奇数长度;以 True/False 开始;仅使用 True/False 元素)。

Here is a solution using only arrays: it takes an array containing a sequence of bools and counts the length of the transitions.

>>> from numpy import array, arange
>>> b = array([0,0,0,1,1,1,0,0,0,1,1,1,1,0,0], dtype=bool)
>>> sw = (b[:-1] ^ b[1:]); print sw
[False False  True False False  True False False  True False False False
  True False]
>>> isw = arange(len(sw))[sw]; print isw
[ 2  5  8 12]
>>> lens = isw[1::2] - isw[::2]; print lens
[3 4]

sw contains a true where there is a switch, isw converts them in indexes. The items of isw are then subtracted pairwise in lens.

Notice that if the sequence started with an 1 it would count the length of the 0s sequences: this can be fixed in the indexing to compute lens. Also, I have not tested corner cases such sequences of length 1.


Full function that returns start positions and lengths of all True-subarrays.

import numpy as np

def count_adjacent_true(arr):
    assert len(arr.shape) == 1
    assert arr.dtype == np.bool
    if arr.size == 0:
        return np.empty(0, dtype=int), np.empty(0, dtype=int)
    sw = np.insert(arr[1:] ^ arr[:-1], [0, arr.shape[0]-1], values=True)
    swi = np.arange(sw.shape[0])[sw]
    offset = 0 if arr[0] else 1
    lengths = swi[offset+1::2] - swi[offset:-1:2]
    return swi[offset:-1:2], lengths

Tested for different bool 1D-arrays (empty array; single/multiple elements; even/odd lengths; started with True/False; with only True/False elements).

凹づ凸ル 2024-08-01 10:25:47

以防万一有人好奇(既然您顺便提到了 MATLAB),这里有一种在 MATLAB 中解决它的方法:

threshold = 7;
d = 10*rand(1,100000);  % Sample data
b = diff([false (d < threshold) false]);
durations = find(b == -1)-find(b == 1);

我对 Python 不太熟悉,但这也许可以帮助您提供一些想法。 =)

Just in case anyone is curious (and since you mentioned MATLAB in passing), here's one way to solve it in MATLAB:

threshold = 7;
d = 10*rand(1,100000);  % Sample data
b = diff([false (d < threshold) false]);
durations = find(b == -1)-find(b == 1);

I'm not too familiar with Python, but maybe this could help give you some ideas. =)

请持续率性 2024-08-01 10:25:47

也许有点晚了,但总体来说基于 Numba 的方法将是最快的。

import numpy as np
import numba as nb


@nb.njit
def count_steps_nb(arr):
    result = 1
    last_x = arr[0]
    for x in arr[1:]:
        if last_x != x:
            result += 1
            last_x = x
    return result


@nb.njit
def rle_nb(arr):
    n = count_steps_nb(arr)
    values = np.empty(n, dtype=arr.dtype)
    lengths = np.empty(n, dtype=np.int_)
    last_x = arr[0]
    length = 1
    i = 0
    for x in arr[1:]:
        if last_x != x:
            values[i] = last_x
            lengths[i] = length
            i += 1
            length = 1
            last_x = x
        else:
            length += 1
    values[i] = last_x
    lengths[i] = length
    return values, lengths

基于 Numpy 的方法(受到 @ThomasBrowne 答案 的启发,但速度更快,因为使用了昂贵的 numpy.concatenate() 减少到最低限度)是亚军(这里提出了两种类似的方法,一种使用不等式来查找步骤的位置,另一种使用差异):

import numpy as np


def rle_np_neq(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] != arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
    return values, lengths
import numpy as np


def rle_np_diff(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] - arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
        return values, lengths

这些都优于朴素和简单的单循环方法:

import numpy as np


def rle_loop(arr):
    values = []
    lengths = []
    last_x = arr[0]
    length = 1
    for x in arr[1:]:
        if last_x != x:
            values.append(last_x)
            lengths.append(length)
            length = 1
            last_x = x
        else:
            length += 1
    values.append(last_x)
    lengths.append(length)
    return np.array(values), np.array(lengths)

相反,使用 itertools.groupby() 不会比简单循环更快(除非在非常特殊的情况下,例如 @AlexMartelli 回答 或者有人会在组对象上实现 __len__),因为通常没有简单的方法来提取组大小信息比循环遍历组本身,这并不快:

import itertools
import numpy as np


def rle_groupby(arr):
    values = []
    lengths = []
    for x, group in itertools.groupby(arr):
        values.append(x)
        lengths.append(sum(1 for _ in group))
    return np.array(values), np.array(lengths)

报告了对不同大小的随机整数数组的一些基准测试的结果:

bm_full
bm_zoom

(完整分析此处)。

Perhaps late to the party, but a Numba-based approach is going to be fastest by far and large.

import numpy as np
import numba as nb


@nb.njit
def count_steps_nb(arr):
    result = 1
    last_x = arr[0]
    for x in arr[1:]:
        if last_x != x:
            result += 1
            last_x = x
    return result


@nb.njit
def rle_nb(arr):
    n = count_steps_nb(arr)
    values = np.empty(n, dtype=arr.dtype)
    lengths = np.empty(n, dtype=np.int_)
    last_x = arr[0]
    length = 1
    i = 0
    for x in arr[1:]:
        if last_x != x:
            values[i] = last_x
            lengths[i] = length
            i += 1
            length = 1
            last_x = x
        else:
            length += 1
    values[i] = last_x
    lengths[i] = length
    return values, lengths

Numpy-based approaches (inspired by @ThomasBrowne answer but faster because the use of the expensive numpy.concatenate() is reduced to a minimum) are the runner-up (here two similar approaches are presented, one using not-equality to find the positions of the steps, and the other one using differences):

import numpy as np


def rle_np_neq(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] != arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
    return values, lengths
import numpy as np


def rle_np_diff(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] - arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
        return values, lengths

These both outperform the naïve and simple single loop approach:

import numpy as np


def rle_loop(arr):
    values = []
    lengths = []
    last_x = arr[0]
    length = 1
    for x in arr[1:]:
        if last_x != x:
            values.append(last_x)
            lengths.append(length)
            length = 1
            last_x = x
        else:
            length += 1
    values.append(last_x)
    lengths.append(length)
    return np.array(values), np.array(lengths)

On the contrary, using itertools.groupby() is not going to be any faster than the simple loop (unless on very special cases like in @AlexMartelli answer or someone will implement __len__ on the group object) because in general there is no simple way of extracting the group size information other than looping through the group itself, which is not exactly fast:

import itertools
import numpy as np


def rle_groupby(arr):
    values = []
    lengths = []
    for x, group in itertools.groupby(arr):
        values.append(x)
        lengths.append(sum(1 for _ in group))
    return np.array(values), np.array(lengths)

The results of some benchmarks on random integer arrays of varying size are reported:

bm_full
bm_zoom

(Full analysis here).

如果没结果 2024-08-01 10:25:47
durations = []
counter   = 0

for bool in b:
    if bool:
        counter += 1
    elif counter > 0:
        durations.append(counter)
        counter = 0

if counter > 0:
    durations.append(counter)
durations = []
counter   = 0

for bool in b:
    if bool:
        counter += 1
    elif counter > 0:
        durations.append(counter)
        counter = 0

if counter > 0:
    durations.append(counter)
傲鸠 2024-08-01 10:25:47
import numpy as np
a = np.array([3, 3, 3, 1, 3, 3, 3, 3, 1, 1, 3, 3, 3, 3, 3])
is_not_in_run = (a != 3)
is_not_in_run = np.concatenate(([True], is_not_in_run, [True]))
gap_indices = np.where(is_not_in_run)[0]
run_lengths = np.diff(gap_indices) - 1
run_lengths = np.delete(run_lengths, np.where(run_lengths == 0)[0])
print(run_lengths)
import numpy as np
a = np.array([3, 3, 3, 1, 3, 3, 3, 3, 1, 1, 3, 3, 3, 3, 3])
is_not_in_run = (a != 3)
is_not_in_run = np.concatenate(([True], is_not_in_run, [True]))
gap_indices = np.where(is_not_in_run)[0]
run_lengths = np.diff(gap_indices) - 1
run_lengths = np.delete(run_lengths, np.where(run_lengths == 0)[0])
print(run_lengths)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文