高效的循环缓冲区?

发布于 2024-10-01 08:00:52 字数 269 浏览 13 评论 0原文

我想在 python 中创建一个高效的循环缓冲区(目标是取缓冲区中整数值的平均值)。

这是使用列表收集值的有效方法吗?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

什么会更有效(以及为什么)?

I want to create an efficient circular buffer in python (with the goal of taking averages of the integer values in the buffer).

Is this an efficient way to use a list to collect values?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

What would be more efficient (and why)?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(15

抱猫软卧 2024-10-08 08:00:53

我将使用 collections.dequemaxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

中有一个食谱 deque 的文档与您想要的类似。我断言它是最有效的,完全基于这样一个事实:它是由一个非常熟练的团队用 C 语言实现的,他们习惯于编写出一流的代码。

I would use collections.deque with a maxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

There is a recipe in the docs for deque that is similar to what you want. My assertion that it's the most efficient rests entirely on the fact that it's implemented in C by an incredibly skilled crew that is in the habit of cranking out top notch code.

魂ガ小子 2024-10-08 08:00:53

尽管这里已经有很多很好的答案,但我找不到提到的选项的时间的任何直接比较。因此,请看我在下面进行的比较尝试。

仅出于测试目的,该类可以在基于 list 的缓冲区、基于 collections.deque 的缓冲区和 Numpy.roll 之间切换基于缓冲区。

请注意,为了简单起见,update 方法一次仅添加一个值。

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method
        self.update = getattr(self, '_update_' + buffer_method)

    def _update_list(self, scalar):
        try:
            # shift
            self.content.append(scalar)
            self.content.pop(0)
        except AttributeError:
            # init
            self.content = [0.] * self.size

    def _update_deque(self, scalar):
        try:
            # shift
            self.content.append(scalar)
        except AttributeError:
            # init
            self.content = collections.deque([0.] * self.size, maxlen=self.size)

    def _update_roll(self, scalar):
        try:
            # shift
            self.content = numpy.roll(self.content, -1)
            self.content[-1] = scalar
        except IndexError:
            # init
            self.content = numpy.zeros(self.size, dtype=float)


# Testing and Timing
circular_buffer_size = 100
circular_buffers = [
    CircularBuffer(buffer_size=circular_buffer_size, buffer_method=method)
    for method in CircularBuffer.buffer_methods
]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size
    )
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == list(range(circular_buffer_size))
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations))
    )
    print(
        '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
            cb.method,
            timeit_results[-1],
            timeit_results[-1] / timeit_iterations * 1e3,
        )
    )

在我的系统上,这会产生:

deque: total 0.87s (0.09ms per iteration)
list:  total 1.06s (0.11ms per iteration)
roll:  total 6.27s (0.63ms per iteration)

Although there are already a great number of great answers here, I could not find any direct comparison of timings for the options mentioned. Therefore, please find my humble attempt at a comparison below.

For testing purposes only, the class can switch between a list-based buffer, a collections.deque-based buffer, and a Numpy.roll-based buffer.

Note that the update method adds only one value at a time, to keep it simple.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method
        self.update = getattr(self, '_update_' + buffer_method)

    def _update_list(self, scalar):
        try:
            # shift
            self.content.append(scalar)
            self.content.pop(0)
        except AttributeError:
            # init
            self.content = [0.] * self.size

    def _update_deque(self, scalar):
        try:
            # shift
            self.content.append(scalar)
        except AttributeError:
            # init
            self.content = collections.deque([0.] * self.size, maxlen=self.size)

    def _update_roll(self, scalar):
        try:
            # shift
            self.content = numpy.roll(self.content, -1)
            self.content[-1] = scalar
        except IndexError:
            # init
            self.content = numpy.zeros(self.size, dtype=float)


# Testing and Timing
circular_buffer_size = 100
circular_buffers = [
    CircularBuffer(buffer_size=circular_buffer_size, buffer_method=method)
    for method in CircularBuffer.buffer_methods
]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size
    )
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == list(range(circular_buffer_size))
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations))
    )
    print(
        '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
            cb.method,
            timeit_results[-1],
            timeit_results[-1] / timeit_iterations * 1e3,
        )
    )

On my system this yields:

deque: total 0.87s (0.09ms per iteration)
list:  total 1.06s (0.11ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
情栀口红 2024-10-08 08:00:53

从列表头部弹出会导致复制整个列表,因此效率

低下您应该使用固定大小的列表/数组和在添加/删除项目时在缓冲区中移动的索引

popping from the head of a list causes the whole list to be copied, so is inefficient

You should instead use a list/array of fixed size and an index which moves through the buffer as you add/remove items

空心↖ 2024-10-08 08:00:53

基于 MoonCactus 的回答,这里有一个 circularlist 类。与他的版本的不同之处在于,这里 c[0] 将始终给出最旧的附加元素 c[-1] 最新附加的元素 c [-2] 倒数第二个...这对于应用程序来说更自然。

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

班级:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)

Based on MoonCactus's answer, here is a circularlist class. The difference with his version is that here c[0] will always give the oldest-appended element, c[-1] the latest-appended element, c[-2] the penultimate... This is more natural for applications.

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

Class:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)
同展鸳鸯锦 2024-10-08 08:00:53

可以使用 deque 类,但是对于问题的要求(平均),这是我的解决方案:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14

ok with the use of deque class, but for the requeriments of the question (average) this is my solution:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
梦在深巷 2024-10-08 08:00:53

Python 的双端队列很慢。您也可以使用 numpy.roll 代替
如何旋转形状为 (n,) 或 (n,1) 的 numpy 数组中的数字?

在此基准测试中,双端队列为 448ms。 Numpy.roll 为 29ms
http://scimusing.wordpress.com/2013/10/ 25/ring-buffers-in-pythonnumpy/

Python's deque is slow. You can also use numpy.roll instead
How do you rotate the numbers in an numpy array of shape (n,) or (n,1)?

In this benchmark, deque is 448ms. Numpy.roll is 29ms
http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

盛夏已如深秋| 2024-10-08 08:00:53

您还可以查看这个相当古老的 Python 配方

这是我自己的 NumPy 数组版本:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value

You can also see this quite old Python recipe.

Here is my own version with NumPy array:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value
新雨望断虹 2024-10-08 08:00:53

Python Cookbook 中的解决方案怎么样,包括当环形缓冲区实例变满时重新分类?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

实施中值得注意的设计选择是,因为这些
对象在某个时刻经历不可逆的状态转换
它们的生命周期——从非满缓冲到全缓冲(以及行为
在那一点上发生变化)——我通过改变 self.__class__ 来建模。
即使在 Python 2.2 中,只要两个类具有相同的内容,这也适用
插槽(例如,它适用于两个经典类,例如
本配方中的 RingBuffer 和 __Full)。

更改实例的类在许多语言中可能很奇怪,
但它是其他表示方式的 Pythonic 替代方案
偶尔的、大规模的、不可逆的、离散的状态变化
极大地影响行为,如本食谱所示。好东西是Python
支持所有类型的类。

信用:Sébastien Keim

How about the solution from the Python Cookbook, including a reclassification of the ring buffer instance when it becomes full?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

The notable design choice in the implementation is that, since these
objects undergo a nonreversible state transition at some point in
their lifetimes—from non-full buffer to full-buffer (and behavior
changes at that point)—I modeled that by changing self.__class__.
This works even in Python 2.2, as long as both classes have the same
slots (for example, it works fine for two classic classes, such as
RingBuffer and __Full in this recipe).

Changing the class of an instance may be strange in many languages,
but it is a Pythonic alternative to other ways of representing
occasional, massive, irreversible, and discrete changes of state that
vastly affect behavior, as in this recipe. Good thing that Python
supports it for all kinds of classes.

Credit: Sébastien Keim

回心转意 2024-10-08 08:00:53

我在进行串行编程之前就遇到过这个问题。就在一年前,我也找不到任何有效的实现,所以我最终编写了 一个一个 C 扩展,并且在 MIT 许可下在 pypi 上也可以使用它。它是超级基本的,仅处理 8 位有符号字符的缓冲区,但长度灵活,因此如果您需要字符以外的东西,您可以使用 Struct 或在其之上的其他东西。我现在通过谷歌搜索看到现在有几个选项,所以你可能也想看看这些。

I've had this problem before doing serial programming. At the time just over a year ago, I couldn't find any efficient implementations either, so I ended up writing one as a C extension and it's also available on pypi under an MIT license. It's super basic, only handles buffers of 8-bit signed chars, but is of flexible length, so you can use Struct or something on top of it if you need something other than chars. I see now with a google search that there are several options these days though, so you might want to look at those too.

那伤。 2024-10-08 08:00:53

来自 Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https:// github.com/heineman/python-data-structs/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

From Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

镜花水月 2024-10-08 08:00:53

这里有很多答案,但没有一个按照 D Left Adjoint to U 的建议对 Numpy ndarray 进行子类化。这避免了使用无法有效扩展的 np.roll,并传递了 Numpy 数组的所有优点(如数组切片)。使用 Numpy 数组将允许您需要运行的大多数分析,包括平均。

RingArray 类

我的解决方案使用 Numpy 文档。

RingArray 使用指定的形状进行初始化,并用 np.nan 值填充。

Itertools 循环用于创建一个一维循环,给出数组中要编辑的下一行位置。这是基于初始化期间数组的高度。

ndarray 方法中添加了一个追加方法,用于将数据写入循环中的下一个位置。

class RingArray(np.ndarray):
    """A modified numpy array type that functions like a stack. 
    RingArray has a set size specified during initialisation. 
    Add new data using the append() method, which will replace the 
    next value in a cyclical fashion. The array itself has all the 
    properties of a numpy array e.g. it can be sliced and accessed as 
    normal. Initially fills the array with np.nan values.
    
    Options
    --------
    shape : tuple
        A tuple of (height, width) for the maximum size of the array.

    Attributes
    ----------
    Inherited from nd.array. Initially fills array with np.nan values.
    
    Methods
    --------
    append(data)
        Add/replace data in the next element of the cycle.
        Data should be the length of the RingArray width.
    
    """    
    def __new__(subtype, shape):
        obj = super().__new__(subtype, shape)
        
        obj = np.vectorize(lambda x: np.nan)(obj)
        
        obj._pointer = cycle(np.arange(0, shape[0]))
        
        return obj
    
    # needed by numpy
    def __array_finalize__(self, obj):
         if obj is None: return
        
    # add data to the next element (looped)
    def append(self, data):
        """Adds or replaces data in the RingArray.
        The function writes to the next row in the Array.
        Once the last row is reached, the assignment row 
        loops back to the start.

        Parameters
        ----------
        data : array_like
            Data should be the length of the RingArray width.
        """        
        self[next(self._pointer)] = data

性能

我相信这种方法的扩展时间为 O(1),但是我不是计算机科学家,所以如果我错了,请纠正我!

可能的问题

由于这是 ndarray 的子类,因此该类中的所有方法都可以在 RingArray 上使用。使用 np.delete 等数组函数删除或添加值将更改数组的形状。这将导致循环出现错误,因为它是在初始化时设置的。因此,在使用append()以外的任何其他方法编辑数组时要小心。

这是我的第一篇堆栈溢出帖子,如果有任何我可以改进的地方,请告诉我:)。

Lots of answers here but none subclass the Numpy ndarray as suggested by D Left Adjoint to U. This avoids using np.roll which does not scale efficiently, and passes on all the advantages of Numpy arrays like array slicing. Using Numpy arrays will allow for most analyses you need to run, including averaging.

RingArray class

My solution subclasses np.ndarray using the guidelines written in the Numpy documentation.

The RingArray is initialised with a specified shape, and filled with np.nan values.

Itertools cycle is used to create a one dimensional cycle that gives the next row position to edit in the array. This is based on the height of the array during initialisation.

An append method is added to the ndarray methods to write data over the next position in the cycle.

class RingArray(np.ndarray):
    """A modified numpy array type that functions like a stack. 
    RingArray has a set size specified during initialisation. 
    Add new data using the append() method, which will replace the 
    next value in a cyclical fashion. The array itself has all the 
    properties of a numpy array e.g. it can be sliced and accessed as 
    normal. Initially fills the array with np.nan values.
    
    Options
    --------
    shape : tuple
        A tuple of (height, width) for the maximum size of the array.

    Attributes
    ----------
    Inherited from nd.array. Initially fills array with np.nan values.
    
    Methods
    --------
    append(data)
        Add/replace data in the next element of the cycle.
        Data should be the length of the RingArray width.
    
    """    
    def __new__(subtype, shape):
        obj = super().__new__(subtype, shape)
        
        obj = np.vectorize(lambda x: np.nan)(obj)
        
        obj._pointer = cycle(np.arange(0, shape[0]))
        
        return obj
    
    # needed by numpy
    def __array_finalize__(self, obj):
         if obj is None: return
        
    # add data to the next element (looped)
    def append(self, data):
        """Adds or replaces data in the RingArray.
        The function writes to the next row in the Array.
        Once the last row is reached, the assignment row 
        loops back to the start.

        Parameters
        ----------
        data : array_like
            Data should be the length of the RingArray width.
        """        
        self[next(self._pointer)] = data

Performance

I believe this method scales at O(1), however I am not a computer scientist, so please correct me if I'm wrong!

Possible issues

As this is a subclass of ndarray, all the methods from that class can be used on the RingArray. Removing or adding values with array functions like np.delete, will change the shape of the array. This will cause an errors with the cycle as it is set at initialisation. For this reason be cautious when editing the array by any other method than append().

This is my first stack overflow post, if there's anything I can improve upon please let me know :).

清音悠歌 2024-10-08 08:00:53

这个不需要任何库。它会生成一个列表,然后按索引循环。

占用空间非常小(没有库),并且运行速度至少是 dequeue 的两倍。这确实有助于计算移动平均值,但请注意,项目不会像上面那样按年龄排序。

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

要获得平均值,例如:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

结果:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

这大约是出队时间的 1/3。

This one does not require any library. It grows a list and then cycle within by index.

The footprint is very small (no library), and it runs twice as fast as dequeue at least. This is good to compute moving averages indeed, but be aware that the items are not kept sorted by age as above.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

To get the average value, e.g.:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Results in:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

This is about 1/3 the time of the equivalent with dequeue.

↙温凉少女 2024-10-08 08:00:53

我在这里得不到答案。显然,如果您在 NumPy 中工作,您通常希望对 array 或 ndarray 进行子类化,这样(至少在循环数组已满时)您仍然可以在循环数组上使用 NumPy 数组算术运算。您唯一需要注意的是,对于跨越多个组件的操作(例如移动平均线),您的窗口不能大于缓冲区中累积的窗口。

另外,正如所有评论者提到的那样,不要使用滚动,因为这违背了效率的目的。如果您需要不断增长的数组,则只需在每次需要调整大小时将其大小加倍(这与循环数组实现不同)。

I don't get the answers here. Obviously if you're working within NumPy, you'd want to subclass either array or ndarray (usually), that way (at least once your cyclic array is full) you can still use the NumPy array arithmetic operations on the cyclical array. The only thing you have to be careful of is that for operations that span multiple components (such as a moving average), you don't have your window be larger than what has accumulated in the buffer.

Also, as all the commenters mentioned, don't use rolling as that defeats the purpose of efficiency. If you need a growing array, you simply double its size each time a resize is required (this is different from a cyclical array implementation).

温柔少女心 2024-10-08 08:00:53

最初的问题是:“高效”循环缓冲区。
按照这个效率要求,aaronasterling 的答案似乎绝对正确。
使用用 Python 编写的专用类,并将时间处理与 collections.deque 进行比较,结果显示 deque 的速度提高了 5.2 倍!
下面是测试此功能的非常简单的代码:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

要将双端队列转换为列表,只需使用:

my_list = [v for v in my_deque]

然后您将获得对双端队列项的 O(1) 随机访问。当然,只有当您在设置一次双端队列后需要对其进行多次随机访问时,这才有价值。

The original question was: "efficient" circular buffer.
According to this efficiency asked for, the answer from aaronasterling seems to be definitively correct.
Using a dedicated class programmed in Python and comparing time processing with collections.deque shows a x5.2 times acceleration with deque!
Here is very simple code to test this:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

To transform a deque into a list, just use:

my_list = [v for v in my_deque]

You will then get O(1) random access to the deque items. Of course, this is only valuable if you need to do many random accesses to the deque after having set it once.

黎夕旧梦 2024-10-08 08:00:53

这将相同的原理应用于一些旨在保存最新文本消息的缓冲区。

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  

This is applying the same principal to some buffers intended to hold the most recent text messages.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文