rxjs 如何优雅的处理轮询任务?
需求:
- 假设异步请求返回有一个状态,值为 pending 或者 success
- 异步请求如果返回 pending,则等待一秒后重新发送这个异步请求,直到返回 success
- 最终使用 subscribe 只返回最终结果,中间过程的输出不走 subscribe
自己的实现
自己尝试实现了,但是结果不是自己想要的,而且如果多个异步轮询( A 轮询 -> B 轮询 -> C 轮询),可能就会使代码变得嵌套层级多,以下是实现的具体代码,有大佬指教指教么?
轮询代码
import { from, Observable, Subscriber } from 'rxjs';
import { delay, last, mapTo, repeatWhen } from 'rxjs/operators';
interface RetryOptions<T = any, P = any> {
try: (tryRequest: P) => Promise<T>;
tryRequest: P;
retryUntil: (response: T) => boolean;
maxTimes?: number;
tick?: number;
}
export const polling = <T = any, P = any>(options: RetryOptions<T, P>) => {
options = Object.assign(
{
maxTimes: 20,
tick: 1000
},
options
);
let result = null;
const notifier = () => {
// 计数最大尝试次数
let count = 0;
const loop = (producer: Subscriber<any>) => {
// 超过最大次数强制退出轮询
if (count >= options.maxTimes) {
producer.complete();
} else {
options
.try(options.tryRequest)
.then(res => {
producer.next(count++);
// 满足条件则退出轮询
if (options.retryUntil(res)) {
producer.complete();
} else {
// 不满足条件则继续轮询
loop(producer);
}
// 保存请求结果
result = res;
})
.catch(err => {
producer.error(err);
});
}
};
return new Observable(producer => {
loop(producer);
});
};
return from([0]).pipe(
delay(options.tick),
// 当满足条件是,进行一下轮轮询
repeatWhen(notifier),
// 转换结果
mapTo(() => result),
last()
);
};
测试用例
import { polling } from './polling';
let count = 0;
const mockRequest = (): Promise<string> => {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (count < 6) {
resolve('pending');
} else {
resolve('finish');
}
count++;
}, 1000);
});
};
polling<string, number>({
try: mockRequest,
tryRequest: count,
retryUntil: res => {
return res === 'finish';
}
}).subscribe((response) => {
const result = response();
console.log('轮询结束: ', result);
// 这个轮询结束后应该怎么继续轮询比较好?
// 继续在这里 polling 下一个轮询吗?容易回调地狱啊
});
结果
轮询结束
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)