B.4 通信顺序进程
1978 年,C. A. R. Hoare 在一篇学术论文中(http://dl.acm.org/citation.cfm?doid=359576.359585 )首次描述了通信顺序进程(Communicating Sequential Processes,CSP),然后又在 1985 年的同名著作中讨论了这个概念(http://www.usingcsp.com/ )。CSP 描述了一种并发“进程”在运行过程中彼此交互(通信)的正式方法。
你可能已经想起了我们在第 1 章中讨论的并发“进程”,此处对 CSP 的探索将建立在对其的理解之上。
和计算机科学领域多数伟大的概念一样,CSP 也带有浓烈的学术形式体系色彩,以进程代数的形式表达。但我觉得符号代数原理对你来说不会有什么实际的意义,所以我们将要寻找其他方法来思考 CSP。
我把多数 CSP 的正式描述和证明交给霍尔的研究以及自他以后许多其他有趣的文章。而我将要做的就是以尽可能非学术化的易于直觉理解的方式简要介绍 CSP 的思路。
B.4.1 消息传递
CSP 的核心原则是独立的进程之间所有的通信和交互必须要通过正式的消息传递。可能与你预期的相反,CSP 消息传递是用同步动作来描述的,其中发送进程和接收进程都需要准备好消息才能传递。
这样的同步消息机制怎么可能与 JavaScript 的异步编程联系在一起呢?
关系的具体化来自于使用 ES6 生成器创建看似同步但底层可能是同步或(更可能的)异步动作的方法的特性。
换句话说,两个或更多并发运行的生成器可以彼此之间用看似同步的形式进行消息传递,同时保持系统的异步本性,因为每个生成器的代码都被暂停(阻塞)了,等待一个异步动作来恢复。
这是如何工作的呢?
设想一个名为 A 的生成器(“进程”),想要发送一个消息给生成器 B。首先 A yield 要发给 B 的这个消息(因此暂停了 A),等 B 就绪并拿到这个消息时,A 就会被恢复(解除阻塞)。
对称地,设想 A 要接收一个来自 B 的消息。A yield 它对来自于 B 的这个消息的请求(因此暂停 A)。而一旦 B 发送了一个消息,A 就拿到消息并恢复执行。
这种 CSP 消息传递的一个更流行的实现来自 ClojureScript 的 core.async 库,还有 go 语言。这些 CSP 实现通过开放在进程间的称为通道 (channel)的管道实现了前面描述的通信语义。
使用术语通道 的部分原因是,在一些模式中可以一次发送多个值到通道的缓冲区,这可能类似于你对流的认识。这里我们并不深入探讨,但要了解,对于管理数据流来说,它可以是非常强大的技术。
在最简单的 CSP 概念中,我们在 A 和 B 之间创建的通道会有一个名为 take(..) 的方法用于阻塞接收一个值,还有一个名为 put(..) 的方法用于阻塞发送一个值。
这看起来可能类似于:
var ch = channel(); function *foo() { var msg = yield take( ch ); console.log( msg ); } function *bar() { yield put( ch, "Hello World" ); console.log( "message sent" ); } run( foo ); run( bar ); // Hello World // "message sent"
比较这个结构化的、( 看似)同步的消息传递交互和 ASQ#runner(..) 通过数组 token.messages 及合作式 yield 提供的非正式非结构化的消息共享机制。本质上,yield put(..) 是一个既发送了值也暂停了执行来传递控制的单个操作,而在前面我们给出的例子中这两者是分开的步骤。
另外,CSP 强调你并不真正显式地传递控制,而是设计并发例程来阻塞等待来自于通道的值或阻塞等待试图发送值到这个通道。协调顺序和协程之间行为的方式就是通过接收和发送消息的阻塞。
合理警告:这个模式非常强大,但是一开始用起来也有些费脑筋。需要进行一些实践才能习惯这种新的协调并发的思考模式。
有几个很好的库已经用 JavaScript 实现了 CSP,其中最著名的是 js-csp(https://github.com/ubolonton/js-csp ),由 James Long(http://twitter.com/jlongster )实现(http://github.com/jlongster/js-csp )并扩展(http://jlongster.com/Taming-the-Asynchronous-Beast-with-CSP-in-JavaScript )。此外,David Nolen(http://twitter.com/swannodette )的关于把 ClojureScript 的 go 风格 core.async CSP 移植到 JS 生成器风格的众多文章也非常重要(http://swannodette.github.io/2013/08/24/es6-generators-and-csp/ )。
B.4.2 asynquence CSP 模拟
由于我们这里一直讨论的异步模式都是在我的 asynquence 库的大背景下进行的,因此你可能有兴趣看到我们可以相当轻松地在 ASQ#runner(..) 生成器处理上添加一个模拟层,作为 CSP API 和特性的近乎完美的移植。这个模拟层作为 asynquence-contrib 包的一个可选部分与 asynquence 一起发布。
与前面的辅助函数 state(..) 非常相 似,ASQ.csp.go(..) 接受一个生成器 ——在 go/core.async 术语中,它被称为 goroutine——并通过返回一个新的生成器将其适配为可与 ASQ#runner(..) 合作。
goroutine 接收一个最初创建好的通道(ch ),而不是被传入一个 token ,一次运行中的所有 goroutien 都会共享这个通道。你可以通过 ASQ.csp.chan(..) 创建更多的通道(这常常会极其有用!)。
在 CSP 中,我们把所有的异步都用通道消息上的阻塞来建模,而不是阻塞等待 Promise/ 序列 /thunk 完成。
因此,不是把从 request(..) 返回的 Promise yield 出来,而是 request(..) 应该返回一个通道,从中你可以 take(..) (拿到)值。换句话说,这种环境和用法下单值通道大致等价于 Promise 或序列。
我们先来构造一个支持通道的 request(..) 版本:
function request(url) { var ch = ASQ.csp.channel(); ajax( url ).then( function(content){ // putAsync(..)的put(..)的一个变异版本,这个版本 // 可以在生成器之外使用。返回一个运算完毕promise。 // 这里我们没有使用这个promise,但是如果当值被 // take(..)之后我们需要得到通知的话,可以使用这个promise。 ASQ.csp.putAsync( ch, content ); } ); return ch; }
由第 3 章可知,promisory 是生产 Promise 的工具;第 4 章里的 thunkory 是生产 thunk 的工具;以及最后在附录 A 中,我们发明了 sequory 来表示生产序列的工具。
很自然地,我们要再次构造一个类似的术语以表示生产通道的工具。我们就称之为 chanory(channel+factory) 吧。作为留给你的练 习,请试着定义一个类似于 Promise.wrap(..)/promisify(..) (第 3 章)、thunkify(..) (第 4 章)和 ASQ.wrap(..) (附录 A)的 channelify(..) 工具。
现在考虑使用 asynquence 风格的 CSP 实现的并发 Ajax 的例子:
ASQ() .runner( ASQ.csp.go( function*(ch){ yield ASQ.csp.put( ch, "http://some.url.2" ); var url1 = yield ASQ.csp.take( ch ); // "http://some.url.1" var res1 = yield ASQ.csp.take( request( url1 ) ); yield ASQ.csp.put( ch, res1 ); } ), ASQ.csp.go( function*(ch){ var url2 = yield ASQ.csp.take( ch ); // "http://some.url.2" yield ASQ.csp.put( ch, "http://some.url.1" ); var res2 = yield ASQ.csp.take( request( url2 ) ); var res1 = yield ASQ.csp.take( ch ); // 把结果传递到下一个序列步骤 ch.buffer_size = 2; ASQ.csp.put( ch, res1 ); ASQ.csp.put( ch, res2 ); } ) ) .val( function(res1,res2){ // res1来自"http://some.url.1" // res2来自"http://some.url.2" } );
在两个 goroutine 之间交换 URL 字符串的消息传递过程是非常直接的。第一个 goroutine 构造一个到第一个 URL 的 Ajax 请求,响应放到通道 ch 中。第二个 goroutine 构造一个到第二个 URL 的 Ajax 请求,然后从通道 ch 拿到第一个响应 res1 。这时,两个响应 res1 和 res2 便都已经完成就绪了。
如果在 goroutine 运行结束时,通道 ch 中还有任何剩下的值,那它们就会被传递到序列的下一个步骤。所以,要从最后的 goroutine 传出消息,可以通过 put(..) 将其放入 ch 中。如上所示,为了避免这些最后的 put(..) 阻塞,我们通过将 ch 的 buffer_size 设置为 2 (默认:0 )而将 ch 切换为缓冲模式。
在这个参考地址(https://gist.github.com/getify/e0d04f1f5aa24b1947ae )可以看到更多使用 asynquence 风格的 CSP 的示例。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论