前言
第一部分: 半协程调度器
- 统一生成器接口
- 生成器迭代
- 生成器返回值
- 生成器委托
- 改写 return
- 抽象异步模型
- 引入异常处理
- 异常: 嵌套任务透传
- 异常: 传递流程
- 异常: 重新进行 CPS 变换
- 异常: 重新加入 Async
- Syscall 与 Context
- 调度器: 里程碑
- spawn
- callcc
- race 与 timeout
- all 与 parallel
- channel 与协程间通信
- 无缓存 channel
- 缓存 channel
- channel 演示
- FutureTask 与 fork
第二部分: Koa
- 穿越地心之旅
- 洋葱圈模型
- rightReduce与中间件compose
- Koa::Application
- Koa::Context
- Koa::Request
- Koa::Response
- Koa - HelloWorld
- Middleware Interface
- Middleware: 全局异常处理
- Middleware: Router
- Middleware: 请求超时
- 一个综合示例
附录
参考
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
缓存 channel
缓存channel
接下来我们来实现带缓存的Channel:
Sends to a buffered channel block only when the buffer is full.
Receives block when the buffer is empty.
<?php
class BufferChannel
{
// 缓存容量
public $cap;
// 缓存
public $queue;
// 同无缓存Channel
public $recvCc;
// 同无缓存Channel
public $sendCc;
public function __construct($cap)
{
assert($cap > 0);
$this->cap = $cap;
$this->queue = new \SplQueue();
$this->sendCc = new \SplQueue();
$this->recvCc = new \SplQueue();
}
public function recv()
{
return callcc(function($cc) {
if ($this->queue->isEmpty()) {
// 当无数据可接收时,$cc入列,让出控制流,挂起接收者协程
$this->recvCc->enqueue($cc);
} else {
// 当有数据可接收时,先接收数据,然后恢复控制流
$val = $this->queue->dequeue();
$this->cap++;
$cc($val, null);
}
// 递归唤醒其他被阻塞的发送者与接收者收发数据,注意顺序
$this->recvPingPong();
});
}
public function send($val)
{
return callcc(function($cc) use($val) {
if ($this->cap > 0) {
// 当缓存未满,发送数据直接加入缓存,然后恢复控制流
$this->queue->enqueue($val);
$this->cap--;
$cc(null, null);
} else {
// 当缓存满,发送者控制流与发送数据入列,让出控制流,挂起发送者协程
$this->sendCc->enqueue([$cc, $val]);
}
// 递归唤醒其他被阻塞的接收者与发送者收发数据,注意顺序
$this->sendPingPong();
// 如果全部代码都为同步,防止多个发送者时,数据全部来自某个发送者
// 应该把sendPingPong 修改为异步执行 defer([$this, "sendPingPong"]);
// 但是swoole本身的defer实现有bug,除非把defer 实现为swoole_timer_after(1, ...)
// recvPingPong 同理
});
}
public function recvPingPong()
{
// 当有阻塞的发送者,唤醒其发送数据
if (!$this->sendCc->isEmpty() && $this->cap > 0) {
list($sendCc, $val) = $this->sendCc->dequeue();
$this->queue->enqueue($val);
$this->cap--;
$sendCc(null, null);
// 当有阻塞的接收者,唤醒其接收数据
if (!$this->recvCc->isEmpty() && !$this->queue->isEmpty()) {
$recvCc = $this->recvCc->dequeue();
$val = $this->queue->dequeue();
$this->cap++;
$recvCc($val);
$this->recvPingPong();
}
}
}
public function sendPingPong()
{
// 当有阻塞的接收者,唤醒其接收数据
if (!$this->recvCc->isEmpty() && !$this->queue->isEmpty()) {
$recvCc = $this->recvCc->dequeue();
$val = $this->queue->dequeue();
$this->cap++;
$recvCc($val);
// 当有阻塞的发送者,唤醒其发送数据
if (!$this->sendCc->isEmpty() && $this->cap > 0) {
list($sendCc, $val) = $this->sendCc->dequeue();
$this->queue->enqueue($val);
$this->cap--;
$sendCc(null, null);
$this->sendPingPong();
}
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论