thinkphp-queue 源码阅读笔记 之 深入理解

发布于 2022-12-14 12:47:20 字数 6132 浏览 193 评论 0

3.1 thinkphp-queue 中消息与队列的保存方式

  • Redis

    在 Redis 中,每一个 队列 都三个key 与之对应 ,以 helloJobQueue 队列举例,其在redis 中的保存方式为:

    key名类型说明
    queues:helloJobQueueList , 列表待执行的任务列表
    queues:helloJobQueue:delayedSorted Set,有序集合延迟执行和定时执行的任务集合
    queues:helloJobQueue:reservedSorted Set,有序集合执行中的任务集合

    使用的:分隔符, 只是用来表示相关key的关联性。本身没有特殊含义。使用分隔符是一种常见的组织key的方式。

    其中,在queues:helloJobQueue 列表中,每个元素的形式如下:

queues:helloJobQueue:delayedqueues:helloJobQueue:delayed 有序集合中,每个元素的形式如下:

  • 可以看到,在有序集合中,每个元素代表一个任务,该元素的 Score 为该任务的入队时间戳,任务的 value 为json 格式,保存了任务的执行情况和业务数据。将value decode 为数组后形式如下:

    [
      'job'  => 'application\\index\\job\\Hello' ,  // jobHandlerClassName,消费者类的类名 
      'data' => [					  // 生产者传入的业务数据
         'time' => '2017-02-18 16:20:10',
         'data' => 'I have 648 apples'
      ],
      'id'   => '77IasdasadIasdadadadKL8t',	// 一个随机的32位字符串
      'attempts' => 2				// 任务的已尝试次数
    ]

    redis驱动下,为了实现任务的延迟执行和过期重发,任务将在这三个key中来回转移,详情可见 3.5

  • Database

    在 Database 中,每个任务对应到表中的一行,queue 字段用来区分不同的队列。

    表的字段结构如下:

  • 其中,payLoad 字段保存了消息的执行者和业务数据,payLoad 字段采用 json 格式的字符串来保存消息,将其 decode 为数组后形式如下:

    [
     'job'   => 'application\\index\\job\\Hello', // jobHandlerClassName,消费者类的类名 
     'data'  => string|array|integer|object       // 生产者传入的业务数据
    ]

3.2 thinkphp-queue 的目录结构和类关系图

这些类构成了消息队列中的几个角色:

角色类名说明
命令行Command + Worker负责解析命令行参数,控制队列的启动,重启
驱动Queue + Connector负责队列的创建,以及消息的入队,出队等操作
任务Job用于将消息转化为一个任务对象,供消费者使用
生产者业务代码负责消息的创建与发布
消费者业务代码负责任务的接收与执行

各个类之间的关系图如下:

3.3 Deamon 模式的执行流程

3.4 Database 模式下消息处理的详细流程

下图中,展示了database 模式下消息处理的详细流程,redis 驱动下大体类似

3.5 redis 驱动下的任务重发细节

在redis驱动下,为了实现任务的延迟执行和过期重发,任务将在这三个key中来回转移。

在3.4 Database模式下消息处理的消息流程中,我们知道,如果配置的expire 不是null ,那么 thinkphp-queue的work进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在redis驱动下,这个步骤则做了更多的事情,详情如下:

  1. queue:xxx:delayed 的key中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到 queue:xxx 的key的尾部。
  2. queue:xxx:reserved 的key中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到 queue:xxx的key的尾部。
  3. 尝试从 queue:xxx 的key的头部取出一个任务,如果取出成功,那么,将这个任务转移到 queue:xxx:reserved 的key 的头部,同时将这个任务实例化成任务对象,交给消费者去执行。

用图来表示这个步骤的具体过程如下:

redis队列中的过期任务重发步骤--执行前:

redis队列中的过期任务重发步骤--执行后:

3.6 thinkphp-queue的性能

  • 测试环境 :

    虚拟机 Ubuntu 16.04 , PHP 7.1 ,TP5,Redis 3.2 , 双核 I5 6400,3G 内存

  • 测试方式 :

    使用 Redis 驱动,在一个控制器中循环推送 40000 条消息到消息队列;

    使用php think queue:work --daemon去消费这些消息,计算推送和消费各自所耗的时间。

  • 测试结果 :

    在最简单的逻辑下,平均每秒中可推送8000个消息,平均每秒可消费200个消息。

注意:由于在测试时,Host 机本身的cpu和内存长期100%,并且虚拟机中的各项服务并未专门调优,因此该测试结果并不具备参考性

3.7 thinkphp-queue 的注意事项

  • 3.7.1 使用建议

    • 任务完成后, 使用 $job->delete() 删除任务
    • 在消费者类的 fire() 方法中,使用 $job->attempt() 检查任务已执行次数,对于次数异常的,作相应的处理。
    • 在消费者类的 fire() 方法中,根据业务数据来判断该任务是否已经执行过,以避免该任务被重复执行。
    • 编写失败回调事件,将事件中失败的任务及时通知给开发人员。
  • 3.7.2 使用了 queue:work --daemon ,但更新代码后没有使用 queue:restart 重启 work 进程, 使得 work 进程中的代码与最新的代码不同,出现各种问题。

  • 3.7.3 使用了 queue:work --daemon ,但是消费者类的 fire() 方法中存在死循环,或 sleep(n) 等逻辑,导致消息队列被堵塞;或者使用了 exit() , die() 这样的逻辑,导致work进程直接终止 。

  • 3.7.4 配置的 expire 为null ,但并没有自行处理过期的任务,导致过期的任务得不到处理,且一直占用消息队列的空间。

  • 3.7.5 配置的 expire 不为null ,但配置的 expire 时间太短,以至于 expire 时间 < 消费者的 fire() 方法所需时间 + 删除该任务所需的时间 ,那么任务将被误认为执行超时,从而被消息队列还原为待执行状态。

  • 3.7.6 queue:work 属于 cli 模式, 其入口文件是根目录下的 think 文件, 网页访问属于 sapi 模式,其入口是 public 目录下的 index.php 文件. 如果需要在入口文件中定义自定义常量, 记得去修改对应的入口文件. 具体可参考 chichoyi 的说明 #7

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

冬天的雪花

暂无简介

文章
评论
26 人气
更多

推荐作者

櫻之舞

文章 0 评论 0

弥枳

文章 0 评论 0

m2429

文章 0 评论 0

野却迷人

文章 0 评论 0

我怀念的。

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文