RxJs observable 监听 10 秒,只返回收到的值中的 5 个,丢弃其余的,然后继续监听?
我有一个可观察的对象,它将从 SignalR 中心获取多个实时交易值(可能每秒多个)。我想要实现的是一个可观察的对象,它连续(每 10 秒)输出过去 10 秒内发生的 5 笔交易的样本。
我编写了一个可观察管道,尝试通过将所有收到的交易添加到缓冲区中 10 秒来实现此目的,然后使用“concatMap”和“from”为缓冲区数组中的每个交易创建一个可观察的。然后,创建另一个缓冲区来收集 5 个值并发出它们。
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
bufferTime(10000),
concatMap((tradeArray) => {
return from(tradeArray);
}),
bufferCount(5),
tap(v => console.log('pipe-end: ', v))
);
但是,管道不断发出在 10 秒窗口中接收到的所有值,但以 5 个为一组。我尝试在 concat 映射之后的管道中添加一个 take(5)
,它对于第一批 5 个值,可以正常工作,但随后可观察的“完成”并停止侦听新值。我还尝试在 concatMap 之后添加一个带有索引的过滤器,如下所示:
filter((v, i) => (i < 6 )),
这适用于第一批 5 个值,但随后会不断过滤掉每个值,因此永远不会创建第二个 5 的缓冲区。此外,过滤器的这种用例似乎已被弃用。
我不确定我是否忽略了这里明显的东西,但我已经查看了许多 rxjs 运算符,但找不到实现此目的的方法
I have an observable that will be taking in multiple real-time trade values (possibly many per second) from a SignalR hub. What I am trying to achieve is an observable that continuously (every 10 seconds) outputs a sample of 5 trades that occurred in those last 10 seconds.
I wrote an observable pipe to try to achieve this by adding all of the received trades into a buffer for 10 seconds, then creating an observable for each of the trades in the buffer array, using 'concatMap' and 'from'. Then, creating another buffer that collects 5 values, and emits them.
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
bufferTime(10000),
concatMap((tradeArray) => {
return from(tradeArray);
}),
bufferCount(5),
tap(v => console.log('pipe-end: ', v))
);
However, the pipe keeps emitting all of the values that it receives in the 10 second window, but in groups of 5. I tried adding a take(5)
in the pipe after the concat map, and it works correctly for the first batch of 5 values, but then the observable "completes" and stops listening for new values. I also tried adding a filter with index after the concatMap like this:
filter((v, i) => (i < 6 )),
This works for the first batch of 5 values, but then keeps filtering out every value, so a second buffer of 5 never gets created. Also this use case of the filter appears to be deprecated.
I'm not sure if I'm overlooking something obvious here, but I've looked at many of the rxjs operators and can't find a way to achieve this
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
听起来你需要的只是 bufferTime。您可以决定保留什么以及之后扔掉什么。
Sounds like all you need is bufferTime. You can decide what to keep and what to throw away afterward.
像这样的事怎么办
What about something like this,
bufferTime
有一个maxBufferSize
参数可以为您执行此操作。您还可以使用
windowTime
在创建每个值后立即输出它,而不是等待所有 5 个值。这些内容在
bufferTime
和windowTime
分别。bufferTime
has amaxBufferSize
argument that will do this for you.You could also use
windowTime
instead to output each value as soon as it's created, rather than waiting for all 5.These are covered in the documentation for
bufferTime
andwindowTime
respectively.