rxjs 如何优雅的处理轮询任务?

发布于 2022-09-11 21:56:55 字数 2517 浏览 13 评论 0

需求:

  • 假设异步请求返回有一个状态,值为 pending 或者 success
  • 异步请求如果返回 pending,则等待一秒后重新发送这个异步请求,直到返回 success
  • 最终使用 subscribe 只返回最终结果,中间过程的输出不走 subscribe

自己的实现

自己尝试实现了,但是结果不是自己想要的,而且如果多个异步轮询( A 轮询 -> B 轮询 -> C 轮询),可能就会使代码变得嵌套层级多,以下是实现的具体代码,有大佬指教指教么?

轮询代码

import { from, Observable, Subscriber } from 'rxjs';
import { delay, last, mapTo, repeatWhen } from 'rxjs/operators';

interface RetryOptions<T = any, P = any> {
  try: (tryRequest: P) => Promise<T>;
  tryRequest: P;
  retryUntil: (response: T) => boolean;
  maxTimes?: number;
  tick?: number;
}

export const polling = <T = any, P = any>(options: RetryOptions<T, P>) => {
  options = Object.assign(
    {
      maxTimes: 20,
      tick: 1000
    },
    options
  );
  let result = null;

  const notifier = () => {
    // 计数最大尝试次数
    let count = 0;
    const loop = (producer: Subscriber<any>) => {
      // 超过最大次数强制退出轮询
      if (count >= options.maxTimes) {
        producer.complete();
      } else {
        options
          .try(options.tryRequest)
          .then(res => {
            producer.next(count++);
            // 满足条件则退出轮询
            if (options.retryUntil(res)) {
              producer.complete();
            } else {
            // 不满足条件则继续轮询
              loop(producer);
            }
            // 保存请求结果
            result = res;
          })
          .catch(err => {
            producer.error(err);
          });
      }
    };
    return new Observable(producer => {
      loop(producer);
    });
  };

  return from([0]).pipe(
    delay(options.tick),
    // 当满足条件是,进行一下轮轮询
    repeatWhen(notifier),
    // 转换结果
    mapTo(() => result),
    last()
  );
};

测试用例

import { polling } from './polling';

let count = 0;

const mockRequest = (): Promise<string> => {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (count < 6) {
        resolve('pending');
      } else {
        resolve('finish');
      }
      count++;
    }, 1000);
  });
};

polling<string, number>({
  try: mockRequest,
  tryRequest: count,
  retryUntil: res => {
    return res === 'finish';
  }
}).subscribe((response) => {
  const result = response();
  console.log('轮询结束: ', result);
  // 这个轮询结束后应该怎么继续轮询比较好?
  // 继续在这里 polling 下一个轮询吗?容易回调地狱啊
});

结果

轮询结束

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

末骤雨初歇 2022-09-18 21:56:55
import { from, Observable, Subscriber } from 'rxjs';
import { delay, expand, filter, take } from 'rxjs/operators';

interface RetryOptions<T = any, P = any> {
  try: (tryRequest: P) => Promise<T>;
  tryRequest: P;
  retryUntil: (response: T) => boolean;
  maxTimes?: number;
  tick?: number;
}

export const polling = <T = any, P = any>(options: RetryOptions<T, P>) => {
  options = Object.assign(
    {
      maxTimes: 20,
      tick: 1000
    },
    options
  );
  let count = 0;

  const request$ = new Observable((subscriber: Subscriber<T>) => {
    if (count >= options.maxTimes) {
      subscriber.error(new Error('超过最大轮询次数'));
    } else {
      options
        .try(options.tryRequest)
        .then(res => {
          subscriber.next(res);
          count++;
        })
        .catch(err => {
          subscriber.error(err);
        });
    }
  });

  return from(request$).pipe(
    expand(() => request$.pipe(delay(1000))),
    filter(res => {
      return options.retryUntil(res);
    }),
    take(1)
  );
};
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文