由中间件思想引发出的 JavaScript 异步队列
队列对于任何语言来说都是重要的,io 的串行,请求的并行等等。在 JavaScript 中,又由于单线程的原因,异步编程又是非常重要的,所以,让我们来尝试一下实现 JS 中的异步队列,并发散到 co 与 generator,async/await。
异步队列
很多面试的时候会问一个问题,就是怎么让异步函数可以顺序执行。方法有很多,callback,promise,观察者,generator,async/await,这些 JS 中处理异步编程的,都可以做到这种串行的需求。但是很麻烦的是,处理起来是挺麻烦的,你要不停的手动在上一个任务调用下一个任务。比如 promise,像这样:
a.then(() => b.then(() => c.then(...)))
代码嵌套的问题,有点严重。所以要是有一个队列就好了,往队列里添加异步任务,执行的时候让队列开始 run 就好了。先制定一下 API,我们有一个 queue,队列都在内部维护,通过 queue.add 添加异步任务,queue.run 执行队列,可以先想想。
参照之前 express 中间件 的实现,给异步任务 async-fun 传入一个 next 方法,只有调用 next,队列才会继续往下走。那这个 next 就至关重要了,它会控制队列往后移一位,执行下一个 async-fun。我们需要一个队列,来保存 async-fun,也需要一个游标,来控制顺序。
以下是我的简单实现:
const queue = () => { const list = []; // 队列 let index = 0; // 游标 // next 方法 const next = () => { if (index >= list.length - 1) return; // 游标 + 1 const cur = list[++index]; cur(next); } // 添加任务 const add = (...fn) => { list.push(...fn); } // 执行 const run = (...args) => { const cur = list[index]; typeof cur === 'function' && cur(next); } // 返回一个对象 return { add, run, } } // 生成异步任务 const async = (x) => { return (next) => {// 传入 next 函数 setTimeout(() => { console.log(x); next(); // 异步任务完成调用 }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.run();// 1, 2, 3, 4, 5, 6 隔一秒一个。
我这里没去构造一个 class,而是通过闭包的特性去处理的。queue 方法返回一个包含 add,run 的对象,add 即为像队列中添加异步方法,run 就是开始执行。在 queue 内部,我们定义了几个变量,list 用来保存队列,index 就是游标,表示队列现在走到哪个函数了,另外,最重要的是 next 方法,它是控制游标向后移动的。
run 函数一旦执行,队列即开始 run。一开始执行队列里的第一个 async 函数,我们把 next 函数传给了它,然后由 async 函数决定什么时候执行 next,即开始执行下一个任务。我们没有并不知道异步任务什么时候才算完成,只能通过打成某种共识,来告知 queue 某个任务完成。就是传给任务的 next 函数。其实 async 返回的这个函数,有一个名字,叫 Thunk,后面我们会简单介绍。
Thunk
thunk 其实是为了解决 “传名调用” 的。就是我传给函数 A 一个表达式作参数 x + 1,但是我不确定这个 x + 1 什么时候会用到,以及会不会用到,如果在传入就执行,这个求值是没有必要的。所以就出现了一个临时函数 Thunk,来保存这个表达式,传入函数 A 中,待需要时再调用。
const thunk = () => { return x + 1; }; const A = thunk => { return thunk() * 2; }
嗯... 其实就是一个回调函数...
暂停
其实只要某个任务,不继续调用 next,队列就已经不会继续往下走了。比如我们 async 任务里加一个判断(通常是异步 io,请求的容错处理):
// queue 函数不变, // async 加限制条件 const async = (x) => { return (next) => { setTimeout(() => { if(x > 3) { console.log(x); q.run(); //重试 return; } console.log(x); next(); }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.run(); //打印结果: 1, 2, 3, 4, 4,4, 4,4 一直是 4
当执行到第四个任务的时候,x 是 4 的时候,不再继续,就可以直接 return,不再调用 next。也有可能是出现错误,我们需要再重试,那就再调用 q.run 就可以了,因为游标保存的就是当前的 async 任务的索引。
另外,还有一种方式,就是添加 stop 方法。虽然感觉上面的方法就 OK 了,但是 stop 的好处在于,你可以主动的停止队列,而不是在 async 任务里加限制条件。当然,有暂停就有继续了,两种方式,一个是 retry,就是重新执行上一次暂停的那个;另一个就是 goOn,不管上次最后一个如何,继续下一个。上代码:
const queue = () => { const list = []; let index = 0; let isStop = false; const next = () => { // 加限制 if (index >= list.length - 1 || isStop) return; const cur = list[++index]; cur(next); } const add = (...fn) => { list.push(...fn); } const run = (...args) => { const cur = list[index]; typeof cur === 'function' && cur(next); } const stop = () => { isStop = true; } const retry = () => { isStop = false; run(); } const goOn = () => { isStop = false; next(); } return { add, run, stop, retry, goOn, } } const async = (x) => { return (next) => { setTimeout(() => { console.log(x); next(); }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.run(); setTimeout(() => { q.stop(); }, 3000) setTimeout(() => { q.goOn(); }, 5000)
其实还是加拦截... 只不过从 async 函数中,换到了 next 函数里面,利用 isStop 这个变量切换 true/false,开关暂停。我加了两个定时器,一个是 3 秒后暂停,一个是 5 秒后继续,(请忽略定时器的误差),按道理应该是队列到三秒的时候,也就是第三个任务执行完暂停,然后再隔 2 秒,继续。结果打印到 3 的时候,停住,两秒之后继续 4,5,6.
两种思路,请结合场景思考问题。
并发
上面的都是在做串行,假如 run 的时候我要并行呢... 也很简单,把队列一次性跑完就可以了。
// 为了代码短一些,把 retry,goOn 先去掉了。 const queue = () => { const list = []; let index = 0; let isStop = false; let isParallel = false; const next = () => { if (index >= list.length - 1 || isStop || isParallel) return; const cur = list[++index]; cur(next); } const add = (...fn) => { list.push(...fn); } const run = (...args) => { const cur = list[index]; typeof cur === 'function' && cur(next); } const parallelRun = () => { isParallel = true; for(const fn of list) { fn(next); } } const stop = () => { isStop = true; } return { add, run, stop, parallelRun, } } const async = (x) => { return (next) => { setTimeout(() => { console.log(x); next(); }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.parallelRun(); // 一秒后全部输出 1, 2, 3, 4, 5, 6
我添加了一个 parallelRun 方法,用于并行,我觉得还是不要放到 run 函数里面了,抽象单元尽量细化还是。然后还加了一个 isParallel 的变量,默认是 false,考虑到 next 函数有可能会被调用,所以需要加一个拦截,保证不会出乱。
以上就是利用仅用 thunk 函数,结合 next 实现的异步队列控制器,queue,你可以把 es6 代码都改成 es5,保证兼容。此实现是足够简单的,不适用于复杂的场景
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论