RxJs observable 监听 10 秒,只返回收到的值中的 5 个,丢弃其余的,然后继续监听?

发布于 2025-01-09 07:33:35 字数 956 浏览 6 评论 0原文

我有一个可观察的对象,它将从 SignalR 中心获取多个实时交易值(可能每秒多个)。我想要实现的是一个可观察的对象,它连续(每 10 秒)输出过去 10 秒内发生的 5 笔交易的样本。

我编写了一个可观察管道,尝试通过将所有收到的交易添加到缓冲区中 10 秒来实现此目的,然后使用“concatMap”和“from”为缓冲区数组中的每个交易创建一个可观察的。然后,创建另一个缓冲区来收集 5 个值并发出它们。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

但是,管道不断发出在 10 秒窗口中接收到的所有值,但以 5 个为一组。我尝试在 concat 映射之后的管道中添加一个 take(5),它对于第一批 5 个值,可以正常工作,但随后可观察的“完成”并停止侦听新值。我还尝试在 concatMap 之后添加一个带有索引的过滤器,如下所示:

filter((v, i) => (i < 6 )),

这适用于第一批 5 个值,但随后会不断过滤掉每个值,因此永远不会创建第二个 5 的缓冲区。此外,过滤器的这种用例似乎已被弃用。

我不确定我是否忽略了这里明显的东西,但我已经查看了许多 rxjs 运算符,但找不到实现此目的的方法

I have an observable that will be taking in multiple real-time trade values (possibly many per second) from a SignalR hub. What I am trying to achieve is an observable that continuously (every 10 seconds) outputs a sample of 5 trades that occurred in those last 10 seconds.

I wrote an observable pipe to try to achieve this by adding all of the received trades into a buffer for 10 seconds, then creating an observable for each of the trades in the buffer array, using 'concatMap' and 'from'. Then, creating another buffer that collects 5 values, and emits them.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

However, the pipe keeps emitting all of the values that it receives in the 10 second window, but in groups of 5. I tried adding a take(5)in the pipe after the concat map, and it works correctly for the first batch of 5 values, but then the observable "completes" and stops listening for new values. I also tried adding a filter with index after the concatMap like this:

filter((v, i) => (i < 6 )),

This works for the first batch of 5 values, but then keeps filtering out every value, so a second buffer of 5 never gets created. Also this use case of the filter appears to be deprecated.

I'm not sure if I'm overlooking something obvious here, but I've looked at many of the rxjs operators and can't find a way to achieve this

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

德意的啸 2025-01-16 07:33:35

听起来你需要的只是 bufferTime。您可以决定保留什么以及之后扔掉什么。

this.bufferedTradeObservable$ = this.tradeReceived.pipe(
  // Buffer for 1 seconds
  bufferTime(10000),
  // Only emit the last 5 values from the buffer.
  map(buffer => buffer.slice(-5))
);

Sounds like all you need is bufferTime. You can decide what to keep and what to throw away afterward.

this.bufferedTradeObservable$ = this.tradeReceived.pipe(
  // Buffer for 1 seconds
  bufferTime(10000),
  // Only emit the last 5 values from the buffer.
  map(buffer => buffer.slice(-5))
);
酒绊 2025-01-16 07:33:35

像这样的事怎么办

let n = 5;
let t = 10;

//Source, emits a value every second (just a placeholder for real source)

const source = interval(1000);

//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));

//Every t=10 seconds switch to a new observable that emits n=5 values and then closes

const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
  switchMap(() => takeNValues)
);

//Subscribe and log n=5 values every t=10 seconds

takeNValuesEveryTSeconds.subscribe(n => 
  console.log('Value => ', n)
);

What about something like this,

let n = 5;
let t = 10;

//Source, emits a value every second (just a placeholder for real source)

const source = interval(1000);

//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));

//Every t=10 seconds switch to a new observable that emits n=5 values and then closes

const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
  switchMap(() => takeNValues)
);

//Subscribe and log n=5 values every t=10 seconds

takeNValuesEveryTSeconds.subscribe(n => 
  console.log('Value => ', n)
);
羁客 2025-01-16 07:33:35

bufferTime 有一个 maxBufferSize 参数可以为您执行此操作。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000, 10000, 5),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        tap(v => console.log('pipe-end: ', v))
      );

您还可以使用 windowTime 在创建每个值后立即输出它,而不是等待所有 5 个值。

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        windowTime(10000, 10000, 5),
        mergeAll()
        tap(v => console.log('pipe-end: ', v))
      );

这些内容在 bufferTimewindowTime 分别。

bufferTime has a maxBufferSize argument that will do this for you.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000, 10000, 5),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        tap(v => console.log('pipe-end: ', v))
      );

You could also use windowTime instead to output each value as soon as it's created, rather than waiting for all 5.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        windowTime(10000, 10000, 5),
        mergeAll()
        tap(v => console.log('pipe-end: ', v))
      );

These are covered in the documentation for bufferTime and windowTime respectively.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文