计算滚动窗口中每秒的消息数?

发布于 2024-08-21 20:24:50 字数 383 浏览 5 评论 0原文

我的程序中有消息以毫秒分辨率进入(从零到几百条消息/毫秒)。

我想做一些分析。具体来说,我想维护消息计数的多个滚动窗口,并在消息进入时进行更新。例如,

  • 最后一秒的消息数、
  • 最后一分钟的消息数、
  • 最后半小时的消息数除以 过去一小时内的消息数

我不能只维护一个简单的计数,例如 “最后一秒有 1,017 条消息”,因为我不知道消息何时早于 1 秒,因此应该不再在计数中...

我想到维护所有消息的队列,搜索超过一秒的最年轻的消息,并从索引推断计数。然而,这似乎太慢了,并且会消耗大量内存。

我可以做什么来跟踪程序中的这些计数,以便我可以有效地实时获取这些值?

I have messages coming into my program with millisecond resolution (anywhere from zero to a couple hundred messages a millisecond).

I'd like to do some analysis. Specifically, I want to maintain multiple rolling windows of the message counts, updated as messages come in. For example,

  • # of messages in last second
  • # of messages in last minute
  • # of messages in last half-hour divided by # of messages in last hour

I can't just maintain a simple count like "1,017 messages in last second", since I won't know when a message is older than 1 second and therefore should no longer be in the count...

I thought of maintaining a queue of all the messages, searching for the youngest message that's older than one second, and inferring the count from the index. However, this seems like it would be too slow, and would eat up a lot of memory.

What can I do to keep track of these counts in my program so that I can efficiently get these values in real-time?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

北斗星光 2024-08-28 20:24:50

这最容易通过循环缓冲区来处理。

循环缓冲区具有固定数量的元素和指向它的指针。您可以将一个元素添加到缓冲区,当您这样做时,您将增加指向下一个元素的指针。如果超过了固定长度缓冲区,则从头开始。这是存储“最后 N”个项目的一种节省空间和时间的方式。

现在,在您的情况下,您可以拥有一个包含 1,000 个计数器的循环缓冲区,每个计数器计算一毫秒内的消息数量。将所有 1,000 个计数器相加即可得出最后一秒的总计数。当然,您可以通过增量更新计数来优化报告部分,即从计数中扣除插入时覆盖的数字,然后添加新数字。

然后,您可以拥有另一个具有 60 个槽的循环缓冲区,并计算整秒内的消息总数;每秒一次,获取毫秒缓冲区的总计数并将计数写入具有秒分辨率的缓冲区等。

这里是类似 C 的伪代码:

int msecbuf[1000]; // initialized with zeroes
int secbuf[60]; // ditto
int msecptr = 0, secptr = 0;
int count = 0;
int msec_total_ctr = 0;
void msg_received() { count++; }
void every_msec() {
  msec_total_ctr -= msecbuf[msecptr];
  msecbuf[msecptr] = count;
  msec_total_ctr += msecbuf[msecptr];
  count = 0;
  msecptr = (msecptr + 1) % 1000;
}
void every_sec() {
  secbuf[secptr] = msec_total_ctr;
  secptr = (secptr + 1) % 60;
}

This is easiest handled by a cyclic buffer.

A cyclic buffer has a fixed number of elements, and a pointer to it. You can add an element to the buffer, and when you do, you increment the pointer to the next element. If you get past the fixed-length buffer you start from the beginning. It's a space and time efficient way to store "last N" items.

Now in your case you could have one cyclic buffer of 1,000 counters, each one counting the number of messages during one millisecond. Adding all the 1,000 counters gives you the total count during last second. Of course you can optimize the reporting part by incrementally updating the count, i.e. deduct form the count the number you overwrite when you insert and then add the new number.

You can then have another cyclic buffer that has 60 slots and counts the aggregate number of messages in whole seconds; once a second, you take the total count of the millisecond buffer and write the count to the buffer having resolution of seconds, etc.

Here C-like pseudocode:

int msecbuf[1000]; // initialized with zeroes
int secbuf[60]; // ditto
int msecptr = 0, secptr = 0;
int count = 0;
int msec_total_ctr = 0;
void msg_received() { count++; }
void every_msec() {
  msec_total_ctr -= msecbuf[msecptr];
  msecbuf[msecptr] = count;
  msec_total_ctr += msecbuf[msecptr];
  count = 0;
  msecptr = (msecptr + 1) % 1000;
}
void every_sec() {
  secbuf[secptr] = msec_total_ctr;
  secptr = (secptr + 1) % 60;
}
铜锣湾横着走 2024-08-28 20:24:50

您需要指数平滑,也称为指数加权移动平均线。获取自上一条消息到达以来的时间的 EWMA,然后将该时间划分为一秒。您可以使用不同的重量运行其中几个,以有效覆盖更长的时间间隔。实际上,您使用的是无限长的窗口,因此您不必担心数据过期;减肥可以为你做到这一点。

You want exponential smoothing, otherwise known as an exponential weighted moving average. Take an EWMA of the time since the last message arrived, and then divide that time into a second. You can run several of these with different weights to cover effectively longer time intervals. Effectively, you're using an infinitely long window then, so you don't have to worry about expiring data; the reducing weights do it for you.

灯角 2024-08-28 20:24:50

对于最后一个毫秒,保留计数。当毫秒切片转到下一个时,重置计数并将计数添加到毫秒滚动缓冲区数组。如果保持这个累积值,则可以使用固定的内存量提取每秒的消息数。

当 0.1 秒的切片(或紧邻 1 分钟的其他一些小值)完成时,将滚动缓冲区数组中的最后 0.1*1000 个项目相加,并将其放入下一个滚动缓冲区中。通过这种方式,您可以保持较小的毫秒滚动缓冲区(最多 1 秒查找 1000 个项目)和每分钟查找缓冲区(600 个项目)。

您可以以 0.1 分钟为间隔进行整分钟的下一个技巧。所有提出的问题都可以通过对几个整数求和(或者使用 cummulative 时,减去两个值)来查询。

唯一的缺点是最后的秒值每毫秒改变一次,分钟值仅每 0.1 秒改变一次,小时值(以及最后 1/2 小时的 % 的导数)每 0.1 分钟改变一次。但至少你可以控制内存的使用。

For the last millisecord, keep the count. When the millisecord slice goes to the next one, reset count and add count to a millisecond rolling buffer array. If you keep this cummulative, you can extract the # of messages / second with a fixed amount of memory.

When a 0,1 second slice (or some other small value next to 1 minute) is done, sum up last 0,1*1000 items from the rolling buffer array and place that in the next rolling buffer. This way you kan keep the millisecord rolling buffer small (1000 items for 1s max lookup) and the buffer for lookup the minute also (600 items).

You can do the next trick for whole minutes of 0,1 minutes intervals. All questions asked can be queried by summing (or when using cummulative , substracting two values) a few integers.

The only disadvantage is that the last sec value wil change every ms and the minute value only every 0,1 secand the hour value (and derivatives with the % in last 1/2 hour) every 0,1 minute. But at least you keep your memory usage at bay.

野生奥特曼 2024-08-28 20:24:50

您的滚动显示窗口只能更新这么快,假设您想每秒更新 10 次,因此对于 1 秒的数据,您将需要 10 个值。每个值将包含 1/10 秒内显示的消息数。我们将这些值称为 bin,每个 bin 保存 1/10 秒的数据。每 100 毫秒,其中一个 bin 就会被丢弃,并且一个新的 bin 会被设置为这 100 毫秒内显示的消息数。

如果您想在整个小时内保持 1/10 秒的精度,则需要一组 36K bin 来保存一小时的有关消息速率的信息。但这似乎有些过分了。

但我认为随着时间间隔变大,精度下降会更合理。

也许您将 1 秒的数据精确到 100 毫秒,将 1 分钟的数据精确到秒,将 1 小时的数据精确到分钟,等等。

Your rolling display window can only update so fast, lets say you want to update it 10 times a second, so for 1 second's worth of data, you would need 10 values. Each value would contain the number of messages that showed up in that 1/10 of a second. Lets call these values bins, each bin holds 1/10 of a second's worth of data. Every 100 milliseconds, one of the bins gets discarded and a new bin is set to the number of messages that have show up in that 100 milliseconds.

You would need an array of 36K bins to hold an hour's worth information about your message rate if you wanted to preserve a precision of 1/10 of a second for the whole hour. But that seems overkill.

But I think it would be more reasonable to have the precision drop off as the time inteval gets larger.

Maybe you keep 1 second's worth of data accurate to 100 milliseconds, 1 minutes worth of data accurate to the second, 1 hour's worth of data accurate to the minute, and so on.

沧桑㈠ 2024-08-28 20:24:50

我想到维护所有消息的队列,搜索超过一秒的最新消息,并从索引推断计数。然而,这似乎太慢了,而且会占用大量内存。

更好的想法是维护消息的链接列表,将新消息添加到头部(带有时间戳),并在它们过期时从尾部弹出它们。或者甚至不弹出它们 - 只需保留一个指向在所需时间范围内传入的最旧消息的指针,并在该消息过期时将其向头部推进(这允许您使用一个列表跟踪多个时间范围)。

您可以在需要时通过从尾部走到头部来计算计数,或者只是单独存储计数,每当向头部添加值时就递增计数,而每当向尾部前进时就递减计数。

I thought of maintaining a queue of all the messages, searching for the youngest message that's older than one second, and inferring the count from the index. However, this seems like it would be too slow, and would eat up a lot of memory.

A better idea would be maintaining a linked list of the messages, adding new messages to the head (with a timestamp), and popping them from the tail as they expire. Or even not pop them - just keep a pointer to the oldest message that came in in the desired timeframe, and advance it towards the head when that message expires (this allows you to keep track of multiply timeframes with one list).

You could compute the count when needed by walking from the tail to the head, or just store the count separately, incrementing it whenever you add a value to the head, and decrementing it whenever you advance the tail.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文