由中间件思想引发出的 JavaScript 异步队列

发布于 2022-06-23 20:35:58 字数 5636 浏览 1184 评论 0

队列对于任何语言来说都是重要的,io 的串行,请求的并行等等。在 JavaScript 中,又由于单线程的原因,异步编程又是非常重要的,所以,让我们来尝试一下实现 JS 中的异步队列,并发散到 co 与 generator,async/await。

异步队列

很多面试的时候会问一个问题,就是怎么让异步函数可以顺序执行。方法有很多,callback,promise,观察者,generator,async/await,这些 JS 中处理异步编程的,都可以做到这种串行的需求。但是很麻烦的是,处理起来是挺麻烦的,你要不停的手动在上一个任务调用下一个任务。比如 promise,像这样:

a.then(() => b.then(() => c.then(...)))

代码嵌套的问题,有点严重。所以要是有一个队列就好了,往队列里添加异步任务,执行的时候让队列开始 run 就好了。先制定一下 API,我们有一个 queue,队列都在内部维护,通过 queue.add 添加异步任务,queue.run 执行队列,可以先想想。

参照之前 express 中间件 的实现,给异步任务 async-fun 传入一个 next 方法,只有调用 next,队列才会继续往下走。那这个 next 就至关重要了,它会控制队列往后移一位,执行下一个 async-fun。我们需要一个队列,来保存 async-fun,也需要一个游标,来控制顺序。

以下是我的简单实现:

const queue = () => {
  const list = []; // 队列
  let index = 0;  // 游标

  // next 方法
  const next = () => {
    if (index >= list.length - 1) return;    
    
    // 游标 + 1
    const cur = list[++index];
    cur(next);
  }
  
  // 添加任务
  const add = (...fn) => {
    list.push(...fn);
  }

  // 执行
  const run = (...args) => {
    const cur = list[index];
    typeof cur === 'function' && cur(next);
  }

  // 返回一个对象
  return {
    add,
    run,
  }
}

// 生成异步任务
const async = (x) => {
  return (next) => {// 传入 next 函数
    setTimeout(() => {
      console.log(x);
      next();  // 异步任务完成调用
    }, 1000);
  }
}

const q = queue();
const funs = '123456'.split('').map(x => async(x));
q.add(...funs);
q.run();// 1, 2, 3, 4, 5, 6 隔一秒一个。

我这里没去构造一个 class,而是通过闭包的特性去处理的。queue 方法返回一个包含 add,run 的对象,add 即为像队列中添加异步方法,run 就是开始执行。在 queue 内部,我们定义了几个变量,list 用来保存队列,index 就是游标,表示队列现在走到哪个函数了,另外,最重要的是 next 方法,它是控制游标向后移动的。

run 函数一旦执行,队列即开始 run。一开始执行队列里的第一个 async 函数,我们把 next 函数传给了它,然后由 async 函数决定什么时候执行 next,即开始执行下一个任务。我们没有并不知道异步任务什么时候才算完成,只能通过打成某种共识,来告知 queue 某个任务完成。就是传给任务的 next 函数。其实 async 返回的这个函数,有一个名字,叫 Thunk,后面我们会简单介绍。

Thunk

thunk 其实是为了解决 “传名调用” 的。就是我传给函数 A 一个表达式作参数 x + 1,但是我不确定这个 x + 1 什么时候会用到,以及会不会用到,如果在传入就执行,这个求值是没有必要的。所以就出现了一个临时函数 Thunk,来保存这个表达式,传入函数 A 中,待需要时再调用。

const thunk = () => {
  return x + 1;
};

const A = thunk => {
  return thunk() * 2;
}

嗯... 其实就是一个回调函数...

暂停

其实只要某个任务,不继续调用 next,队列就已经不会继续往下走了。比如我们 async 任务里加一个判断(通常是异步 io,请求的容错处理):

// queue 函数不变,
// async 加限制条件
const async = (x) => {
  return (next) => {
    setTimeout(() => {
      if(x > 3) {
        console.log(x);
        q.run();  //重试
        return;
      }
      console.log(x);
      next();
    }, 1000);
  }
}

const q = queue();
const funs = '123456'.split('').map(x => async(x));
q.add(...funs);
q.run();
//打印结果: 1, 2, 3, 4, 4,4, 4,4 一直是 4

当执行到第四个任务的时候,x 是 4 的时候,不再继续,就可以直接 return,不再调用 next。也有可能是出现错误,我们需要再重试,那就再调用 q.run 就可以了,因为游标保存的就是当前的 async 任务的索引。

另外,还有一种方式,就是添加 stop 方法。虽然感觉上面的方法就 OK 了,但是 stop 的好处在于,你可以主动的停止队列,而不是在 async 任务里加限制条件。当然,有暂停就有继续了,两种方式,一个是 retry,就是重新执行上一次暂停的那个;另一个就是 goOn,不管上次最后一个如何,继续下一个。上代码:

const queue = () => {
  const list = [];
  let index = 0;
  let isStop = false;

  const next = () => {
    // 加限制
    if (index >= list.length - 1 || isStop) return;    
    const cur = list[++index];
    cur(next);
  }

  const add = (...fn) => {
    list.push(...fn);
  }

  const run = (...args) => {
    const cur = list[index];
    typeof cur === 'function' && cur(next);
  }

  const stop = () => {
    isStop = true;
  }

  const retry = () => {
    isStop = false;
    run();
  }

  const goOn = () => {
    isStop = false;
    next();
  }

  return {
    add,
    run,
    stop,
    retry,
    goOn,
  }
}

const async = (x) => {
  return (next) => {
    setTimeout(() => {
      console.log(x);
      next();
    }, 1000);
  }
}

const q = queue();
const funs = '123456'.split('').map(x => async(x));
q.add(...funs);
q.run();

setTimeout(() => {
  q.stop();
}, 3000)


setTimeout(() => {
  q.goOn();
}, 5000)

其实还是加拦截... 只不过从 async 函数中,换到了 next 函数里面,利用 isStop 这个变量切换 true/false,开关暂停。我加了两个定时器,一个是 3 秒后暂停,一个是 5 秒后继续,(请忽略定时器的误差),按道理应该是队列到三秒的时候,也就是第三个任务执行完暂停,然后再隔 2 秒,继续。结果打印到 3 的时候,停住,两秒之后继续 4,5,6.

两种思路,请结合场景思考问题。

并发

上面的都是在做串行,假如 run 的时候我要并行呢... 也很简单,把队列一次性跑完就可以了。

// 为了代码短一些,把 retry,goOn 先去掉了。

const queue = () => {
  const list = [];
  let index = 0;
  let isStop = false;
  let isParallel = false;

  const next = () => {
    if (index >= list.length - 1 || isStop || isParallel) return;    
    const cur = list[++index];
    cur(next);
  }

  const add = (...fn) => {
    list.push(...fn);
  }

  const run = (...args) => {
    const cur = list[index];
    typeof cur === 'function' && cur(next);
  }

  const parallelRun = () => {
    isParallel = true;
    for(const fn of list) {
      fn(next);
    }
  }

  const stop = () => {
    isStop = true;
  }
  
  return {
    add,
    run,
    stop,
    parallelRun,
  }
}

const async = (x) => {
  return (next) => {
    setTimeout(() => {
      console.log(x);
      next();
    }, 1000);
  }
}

const q = queue();
const funs = '123456'.split('').map(x => async(x));
q.add(...funs);
q.parallelRun();
// 一秒后全部输出 1, 2, 3, 4, 5, 6

我添加了一个 parallelRun 方法,用于并行,我觉得还是不要放到 run 函数里面了,抽象单元尽量细化还是。然后还加了一个 isParallel 的变量,默认是 false,考虑到 next 函数有可能会被调用,所以需要加一个拦截,保证不会出乱。

以上就是利用仅用 thunk 函数,结合 next 实现的异步队列控制器,queue,你可以把 es6 代码都改成 es5,保证兼容。此实现是足够简单的,不适用于复杂的场景

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

0 文章
0 评论
23 人气
更多

推荐作者

醉城メ夜风

文章 0 评论 0

远昼

文章 0 评论 0

平生欢

文章 0 评论 0

微凉

文章 0 评论 0

Honwey

文章 0 评论 0

qq_ikhFfg

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文