thinkphp-queue 源码阅读笔记 之 深入理解
3.1 thinkphp-queue 中消息与队列的保存方式
Redis
在 Redis 中,每一个 队列 都三个key 与之对应 ,以 helloJobQueue 队列举例,其在redis 中的保存方式为:
key名 类型 说明 queues:helloJobQueue List , 列表 待执行的任务列表 queues:helloJobQueue:delayed Sorted Set,有序集合 延迟执行和定时执行的任务集合 queues:helloJobQueue:reserved Sorted Set,有序集合 执行中的任务集合 使用的
:
分隔符, 只是用来表示相关key的关联性。本身没有特殊含义。使用分隔符是一种常见的组织key的方式。其中,在
queues:helloJobQueue
列表中,每个元素的形式如下:
在 queues:helloJobQueue:delayed
和 queues:helloJobQueue:delayed
有序集合中,每个元素的形式如下:
可以看到,在有序集合中,每个元素代表一个任务,该元素的 Score 为该任务的入队时间戳,任务的 value 为json 格式,保存了任务的执行情况和业务数据。将value decode 为数组后形式如下:
[ 'job' => 'application\\index\\job\\Hello' , // jobHandlerClassName,消费者类的类名 'data' => [ // 生产者传入的业务数据 'time' => '2017-02-18 16:20:10', 'data' => 'I have 648 apples' ], 'id' => '77IasdasadIasdadadadKL8t', // 一个随机的32位字符串 'attempts' => 2 // 任务的已尝试次数 ]
redis驱动下,为了实现任务的延迟执行和过期重发,任务将在这三个key中来回转移,详情可见 3.5
Database
在 Database 中,每个任务对应到表中的一行,queue 字段用来区分不同的队列。
表的字段结构如下:
其中,payLoad 字段保存了消息的执行者和业务数据,payLoad 字段采用 json 格式的字符串来保存消息,将其 decode 为数组后形式如下:
[ 'job' => 'application\\index\\job\\Hello', // jobHandlerClassName,消费者类的类名 'data' => string|array|integer|object // 生产者传入的业务数据 ]
3.2 thinkphp-queue 的目录结构和类关系图
这些类构成了消息队列中的几个角色:
角色 | 类名 | 说明 |
---|---|---|
命令行 | Command + Worker | 负责解析命令行参数,控制队列的启动,重启 |
驱动 | Queue + Connector | 负责队列的创建,以及消息的入队,出队等操作 |
任务 | Job | 用于将消息转化为一个任务对象,供消费者使用 |
生产者 | 业务代码 | 负责消息的创建与发布 |
消费者 | 业务代码 | 负责任务的接收与执行 |
各个类之间的关系图如下:
3.3 Deamon 模式的执行流程
3.4 Database 模式下消息处理的详细流程
下图中,展示了database 模式下消息处理的详细流程,redis 驱动下大体类似
3.5 redis 驱动下的任务重发细节
在redis驱动下,为了实现任务的延迟执行和过期重发,任务将在这三个key中来回转移。
在3.4 Database模式下消息处理的消息流程中,我们知道,如果配置的expire 不是null ,那么 thinkphp-queue的work进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在redis驱动下,这个步骤则做了更多的事情,详情如下:
- 从
queue:xxx:delayed
的key中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到queue:xxx
的key的尾部。 - 从
queue:xxx:reserved
的key中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到queue:xxx
的key的尾部。 - 尝试从
queue:xxx
的key的头部取出一个任务,如果取出成功,那么,将这个任务转移到queue:xxx:reserved
的key 的头部,同时将这个任务实例化成任务对象,交给消费者去执行。
用图来表示这个步骤的具体过程如下:
redis队列中的过期任务重发步骤--执行前:
redis队列中的过期任务重发步骤--执行后:
3.6 thinkphp-queue的性能
测试环境 :
虚拟机 Ubuntu 16.04 , PHP 7.1 ,TP5,Redis 3.2 , 双核 I5 6400,3G 内存
测试方式 :
使用 Redis 驱动,在一个控制器中循环推送 40000 条消息到消息队列;
使用
php think queue:work --daemon
去消费这些消息,计算推送和消费各自所耗的时间。测试结果 :
在最简单的逻辑下,平均每秒中可推送8000个消息,平均每秒可消费200个消息。
注意:由于在测试时,Host 机本身的cpu和内存长期100%,并且虚拟机中的各项服务并未专门调优,因此该测试结果并不具备参考性。
3.7 thinkphp-queue 的注意事项
3.7.1 使用建议
- 任务完成后, 使用
$job->delete()
删除任务 - 在消费者类的
fire()
方法中,使用$job->attempt()
检查任务已执行次数,对于次数异常的,作相应的处理。 - 在消费者类的
fire()
方法中,根据业务数据来判断该任务是否已经执行过,以避免该任务被重复执行。 - 编写失败回调事件,将事件中失败的任务及时通知给开发人员。
- 任务完成后, 使用
3.7.2 使用了
queue:work --daemon
,但更新代码后没有使用queue:restart
重启 work 进程, 使得 work 进程中的代码与最新的代码不同,出现各种问题。3.7.3 使用了
queue:work --daemon
,但是消费者类的 fire() 方法中存在死循环,或sleep(n)
等逻辑,导致消息队列被堵塞;或者使用了exit()
,die()
这样的逻辑,导致work进程直接终止 。3.7.4 配置的 expire 为
null
,但并没有自行处理过期的任务,导致过期的任务得不到处理,且一直占用消息队列的空间。3.7.5 配置的 expire
不为null
,但配置的 expire 时间太短,以至于 expire 时间 < 消费者的fire()
方法所需时间 + 删除该任务所需的时间 ,那么任务将被误认为执行超时,从而被消息队列还原为待执行状态。3.7.6 queue:work 属于 cli 模式, 其入口文件是根目录下的 think 文件, 网页访问属于 sapi 模式,其入口是 public 目录下的 index.php 文件. 如果需要在入口文件中定义自定义常量, 记得去修改对应的入口文件. 具体可参考 chichoyi 的说明 #7
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论