在 Rxjs 中,为什么 mergeScan 操作符累加会失效?
我想用 promise 中返回的数据进行累加,于是应用了 rxjs 中的 mergeScan
操作符。但是在应用中发现,如果在返回的流中包含了 flatMap
过的 promise
,则返回的值不会累加到累加器上面去:
const Rx = require('rxjs/Rx');
const {
of ,
} = require('rxjs');
const click$ = new Rx.Subject();
const seed = 0;
const testPromise = new Promise((resolve) => {
resolve(1);
})
const count$ = click$.mergeScan((acc, promise) => of (promise)
.flatMap(promise => promise)
.map((one) => {
console.log('acc', acc);
return acc + one
}), seed);
count$.subscribe(x => console.log('value',x));
click$.next(testPromise);
click$.next(testPromise);
click$.next(testPromise);
输出如下,累加器的值没有增加:
acc, 0
value, 1
acc, 0
value, 1
acc, 0
value, 1
但是如果去除掉 promise
:
const Rx = require('rxjs/Rx');
const {
of ,
} = require('rxjs');
const click$ = new Rx.Subject();
const seed = 0;
const count$ = click$.mergeScan((acc, one) => of (one)
.map((one) => {
console.log('acc', acc);
return acc + one
}), seed);
count$.subscribe(x => console.log('value', x));
click$.next(1);
click$.next(1);
click$.next(1);
输出是:
acc, 0
value, 1
acc, 1
value, 2
acc, 2
value, 3
可以正常的工作。
请问这是什么原因呢?我该如何修复这个问题?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
分析
mergeScan
会在每次接受到新的数据时调用迭代函数,并订阅函数返回的流,之后会把返回的流的最后一个数据发送给下游.默认情况下是并发执行.在你的第一个例子中,同步发射了三个
Promise
.当第一个
testPromise
被mergeScan
接收时,mergeScan
把seed
和testPromise
传递给迭代函数的acc
和promise
,并订阅迭代函数返回的流,因为流内部需要获取异步函数中返回的结果,所以这个流是个异步流.当第二个
testPromise
被接收时,因为mergeScan
是并发执行的,而第一个流是异步流,结果还没有返回,所以这个时候还是把seed
和testPromise
传递给迭代函数.第三个
testPromise
也一样.在第二个例子中,发射的是 1,这个时候迭代函数返回的是一个同步流,当迭代函数返回流的同时,这个流也同时结束,
mergeScan
能获取流的结果,所以当mergeScan
接收第二个 1 的时候,已经获取到了返回流的最后一个数据 1,然后传递给迭代函数的acc
,所以在第二个例子中能得到你想要的结果.解决
知道原因之后,要让第一个例子也按照你想要的方式执行的话,就很简单了,只要让
mergeScan
接收到第二个testPromise
的时候不是立即传递给迭代函数,而是等待第一个流返回结果之后,在把结果和testPromise
传递给迭代函数,正好mergeScan
的第三个参数concurrent
就是控制同一时间内mergeScan
的最大订阅数的,设置concurrent
为 1 就能达到你想要的结果.参考
你是同步调用了三次
next
,导致并没有产生累加的过程。可以添加
observeOn(asyncScheduler)
,强制成异步。