在 Rxjs 中,为什么 mergeScan 操作符累加会失效?

发布于 2022-09-11 19:14:25 字数 1321 浏览 16 评论 0

我想用 promise 中返回的数据进行累加,于是应用了 rxjs 中的 mergeScan 操作符。但是在应用中发现,如果在返回的流中包含了 flatMap 过的 promise,则返回的值不会累加到累加器上面去:

const Rx = require('rxjs/Rx');
const {
  of ,
} = require('rxjs');

const click$ = new Rx.Subject();
const seed = 0;
const testPromise = new Promise((resolve) => {
  resolve(1);
})
const count$ = click$.mergeScan((acc, promise) => of (promise)
  .flatMap(promise => promise)
  .map((one) => {
    console.log('acc', acc);
    return acc + one
  }), seed);
count$.subscribe(x => console.log('value',x)); 

click$.next(testPromise);
click$.next(testPromise);
click$.next(testPromise);

输出如下,累加器的值没有增加:

acc, 0
value, 1
acc, 0
value, 1
acc, 0
value, 1

但是如果去除掉 promise

const Rx = require('rxjs/Rx');
const {
  of ,
} = require('rxjs');

const click$ = new Rx.Subject();
const seed = 0;
const count$ = click$.mergeScan((acc, one) => of (one)
  .map((one) => {
    console.log('acc', acc);
    return acc + one
  }), seed);
count$.subscribe(x => console.log('value', x));

click$.next(1);
click$.next(1);
click$.next(1);

输出是:

acc, 0
value, 1
acc, 1
value, 2
acc, 2
value, 3

可以正常的工作。
请问这是什么原因呢?我该如何修复这个问题?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

红尘作伴 2022-09-18 19:14:25

分析

mergeScan会在每次接受到新的数据时调用迭代函数,并订阅函数返回的流,之后会把返回的流的最后一个数据发送给下游.默认情况下是并发执行.

在你的第一个例子中,同步发射了三个Promise.
当第一个testPromisemergeScan接收时,mergeScanseedtestPromise传递给迭代函数的accpromise,并订阅迭代函数返回的流,因为流内部需要获取异步函数中返回的结果,所以这个流是个异步流.
当第二个testPromise被接收时,因为mergeScan是并发执行的,而第一个流是异步流,结果还没有返回,所以这个时候还是把seedtestPromise传递给迭代函数.
第三个testPromise也一样.

在第二个例子中,发射的是 1,这个时候迭代函数返回的是一个同步流,当迭代函数返回流的同时,这个流也同时结束,mergeScan能获取流的结果,所以当mergeScan接收第二个 1 的时候,已经获取到了返回流的最后一个数据 1,然后传递给迭代函数的acc,所以在第二个例子中能得到你想要的结果.

解决

知道原因之后,要让第一个例子也按照你想要的方式执行的话,就很简单了,只要让mergeScan接收到第二个testPromise的时候不是立即传递给迭代函数,而是等待第一个流返回结果之后,在把结果和testPromise传递给迭代函数,正好mergeScan的第三个参数concurrent就是控制同一时间内mergeScan的最大订阅数的,设置concurrent为 1 就能达到你想要的结果.

const count$ = click$.mergeScan(
  (acc, promise) =>
    of(promise)
      .flatMap(promise => promise)
      .map(one => {
        console.log('acc', acc)
        return acc + one
      }),
  seed,
  1, // 将并发设置为 1
)

参考

苍暮颜 2022-09-18 19:14:25

你是同步调用了三次next,导致并没有产生累加的过程。
可以添加observeOn(asyncScheduler),强制成异步。

const Rx = require('rxjs/Rx');
const {
  of ,
} = require('rxjs');

const click$ = new Rx.Subject();
const seed = 0;
const testPromise = new Promise((resolve) => {
  resolve(1);
})
const count$ = click$
  .observeOn(Rx.asyncScheduler)
  .mergeScan((acc, promise) => of (promise)
  .flatMap(promise => promise)
  .map((one) => {
    console.log('acc', acc);
    return acc + one
  }), seed);
count$.subscribe(x => console.log('value',x)); 

click$.next(testPromise);
click$.next(testPromise);
click$.next(testPromise);

clipboard.png

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文