反应式扩展是否支持滚动缓冲区?
我正在使用反应式扩展将数据整理到 100 毫秒的缓冲区中:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
这工作得很好。但是,我想要的行为与 Buffer 操作提供的行为略有不同。本质上,如果收到另一个数据项,我想重置计时器。只有当整个 100ms 都没有收到数据时我才想处理它。这开启了永远处理数据的可能性,因此我还应该能够指定最大计数。我会想象这样的事情:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
我环顾四周,但在 Rx 中找不到类似的东西?有人能证实/否认这一点吗?
I'm using reactive extensions to collate data into buffers of 100ms:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
This works fine. However, I want slightly different behavior than that provided by the Buffer
operation. Essentially, I want to reset the timer if another data item is received. Only when no data has been received for the entire 100ms do I want to handle it. This opens up the possibility of never handling the data, so I should also be able to specify a maximum count. I would imagine something along the lines of:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
I've had a look around and haven't been able to find anything like this in Rx? Can anyone confirm/deny this?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(6)
这可以通过组合
可观察
。首先,让我们解决忽略最大计数条件的更简单的问题:强大的
Window
方法 完成了繁重的工作。现在很容易了解如何添加最大计数:我将在我的博客上写一篇文章对此进行解释。 https://gist.github.com/2244036
Window 方法的文档:
This is possible by combining the built-in
Window
andThrottle
methods ofObservable
. First, let's solve the simpler problem where we ignore the maximum count condition:The powerful
Window
method did the heavy lifting. Now it's easy enough to see how to add a maximum count:I'll write a post explaining this on my blog. https://gist.github.com/2244036
Documentation for the Window method:
我编写了一个扩展来完成您所要做的大部分事情 - BufferWithInactivity。
这里是:
I wrote an extension to do most of what you're after -
BufferWithInactivity
.Here it is:
使用 Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区重载来满足这两个要求:
请参阅 https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx 查看文档。
With Rx Extensions 2.0, your can answer both requirements with a new Buffer overload accepting a timeout and a size:
See https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx for the documentation.
正如 Rohit Sharma 在 Panic 上校的解决方案中的评论中提到的,项目将被缓冲并且不会被缓冲的位置存在问题。除非生成了项目,否则推送给订阅者。
正如此 comment 中所述,问题是
p.Window (() => closes)
,因为它打开了一个可能会错过事件的间隙。由于现在始终使用相同的 lambda,因此我们需要调整 maxCount。如果没有更改,maxCount 将永远不会重置,并且在命中一次后,每个新事件都将超过 maxCount。
更新:
经过进一步的测试,我发现在某些情况下,项目仍然无法正确推送给订阅者。
以下是我们使用的解决方法,4 个月以来一直没有出现任何问题。
解决方法是将
.Delay(...)
添加到任何TimeSpan
中。As Rohit Sharma mentioned with his comment at Colonel Panic's solution, there is a problem with where items will be buffered and will not be pushed to subscriber unless an item is generated.
As described in this comment the problem is
p.Window(() => closes)
, because it opens up a gap in which events can be missed.Since now always the same lambda is used, we need to adjust the maxCount. Without the change the maxCount would never be reseted and after it was hit once, every new event would be over the maxCount.
Update:
After further tests i found out that still, in some cases, items will not be correctly pushed to the subscriber.
Here is the workaround which works for us since already 4 months without any problems.
The workaround is adding
.Delay(...)
with anyTimeSpan
.我想这可以在 Buffer 方法之上实现,如下所示:
注意:我还没有测试过它,但我希望它能给你这个想法。
I guess this can be implemented on top of Buffer method as shown below:
NOTE: I haven't tested it, but I hope it gives you the idea.
Panic 上校的解决方案几乎是完美的。唯一缺少的是
Publish
组件,以便使该解决方案也适用于冷序列。除了添加
Publish
之外,我还用等效但更短的内容替换了原始的.Where((_, index) => index + 1 >= maxCount)
.Skip(maxCount - 1)
。为了完整起见,还有一个 IScheduler 参数,用于配置运行计时器的调度程序。Colonel Panic's solution is almost perfect. The only thing that is missing is a
Publish
component, in order to make the solution work with cold sequences too.Beyond adding the
Publish
, I've also replaced the original.Where((_, index) => index + 1 >= maxCount)
with the equivalent but shorter.Skip(maxCount - 1)
. For completeness there is also anIScheduler
parameter, which configures the scheduler where the timer is run.