返回介绍

Redis 异步队列

发布于 2021-04-03 03:37:35 字数 26092 浏览 1739 评论 0 收藏 0

异步队列区别于 RabbitMQ Kafka 等消息队列,它只提供一种 异步处理异步延时处理 的能力,并 不能 严格地保证消息的持久化和 不支持 完备的 ACK 应答机制。

安装

composer require hyperf/async-queue

配置

配置文件位于 config/autoload/async_queue.php,如文件不存在可自行创建。

暂时只支持 Redis Driver 驱动。

配置类型默认值备注
driverstringHyperf\AsyncQueue\Driver\RedisDriver::class
channelstringqueue队列前缀
redis.poolstringdefaultredis 连接池
timeoutint2pop 消息的超时时间
retry_secondsint,array5失败后重新尝试间隔
handle_timeoutint10消息处理超时时间
processesint1消费进程数
concurrent.limitint1同时处理消息数
max_messagesint0进程重启所需最大处理的消息数 默认不重启
<?php

return [
    'default' => [
        'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
        'redis' => [
            'pool' => 'default'
        ],
        'channel' => 'queue',
        'timeout' => 2,
        'retry_seconds' => 5,
        'handle_timeout' => 10,
        'processes' => 1,
        'concurrent' => [
            'limit' => 5,
        ],
    ],
];

retry_seconds 也可以传入数组,根据重试次数相应修改重试时间,例如

<?php

return [
    'default' => [
        'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
        'channel' => 'queue',
        'retry_seconds' => [1, 5, 10, 20],
        'processes' => 1,
    ],
];

工作原理

ConsumerProcess 是异步消费进程,会根据用户创建的 Job 或者使用 @AsyncQueueMessage 的代码块,执行消费逻辑。 Job@AsyncQueueMessage 都是需要投递和执行的任务,即数据、消费逻辑都会在任务中定义。

  • Job 类中成员变量即为待消费的数据,handle() 方法则为消费逻辑。
  • @AsyncQueueMessage 注解的方法,构造函数传入的数据即为待消费的数据,方法体则为消费逻辑。
graph LR;
A[服务启动]-->B[异步消费进程启动]
B-->C[监听队列]
D[投递任务]-->C
C-->F[消费任务]

使用

配置异步消费进程

组件已经提供了默认 异步消费进程,只需要将它配置到 config/autoload/processes.php 中即可。

<?php

return [
    Hyperf\AsyncQueue\Process\ConsumerProcess::class,
];

当然,您也可以将以下 Process 添加到自己的项目中。

配置方式和注解方式,二选一即可。

<?php

declare(strict_types=1);

namespace App\Process;

use Hyperf\AsyncQueue\Process\ConsumerProcess;
use Hyperf\Process\Annotation\Process;

/**
 * @Process(name="async-queue")
 */
class AsyncQueueConsumer extends ConsumerProcess
{
}

生产消息

传统方式

这种模式会把对象直接序列化然后存到 Redis 等队列中,所以为了保证序列化后的体积,尽量不要将 ContainerConfig 等设置为成员变量。

比如以下 Job 的定义,是 不可取 的,同理 @Inject 也是如此。

因为 Job 会被序列化,所以成员变量不要包含 匿名函数 等 无法被序列化 的内容,如果不清楚哪些内容无法被序列化,尽量使用注解方式。

<?php

declare(strict_types=1);

namespace App\Job;

use Hyperf\AsyncQueue\Job;
use Psr\Container\ContainerInterface;

class ExampleJob extends Job
{
    public $container;

    public $params;

    public function __construct(ContainerInterface $container, $params)
    {
        $this->container = $container;
        $this->params = $params;
    }

    public function handle()
    {
        // 根据参数处理具体逻辑
        var_dump($this->params);
    }
}

$job = make(ExampleJob::class);

正确的 Job 应该是只有需要处理的数据,其他相关数据,可以在 handle 方法中重新获取,如下。

<?php

declare(strict_types=1);

namespace App\Job;

use Hyperf\AsyncQueue\Job;

class ExampleJob extends Job
{
    public $params;

    /**
     * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
     *
     * @var int
     */
    protected $maxAttempts = 2;

    public function __construct($params)
    {
        // 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
        $this->params = $params;
    }

    public function handle()
    {
        // 根据参数处理具体逻辑
        // 通过具体参数获取模型等
        // 这里的逻辑会在 ConsumerProcess 进程中执行
        var_dump($this->params);
    }
}

正确定义完 Job 后,我们需要写一个专门投递消息的 Service,代码如下。

<?php

declare(strict_types=1);

namespace App\Service;

use App\Job\ExampleJob;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\Driver\DriverInterface;

class QueueService
{
    /**
     * @var DriverInterface
     */
    protected $driver;

    public function __construct(DriverFactory $driverFactory)
    {
        $this->driver = $driverFactory->get('default');
    }

    /**
     * 生产消息.
     * @param $params 数据
     * @param int $delay 延时时间 单位秒
     */
    public function push($params, int $delay = 0): bool
    {
        // 这里的 `ExampleJob` 会被序列化存到 Redis 中,所以内部变量最好只传入普通数据
        // 同理,如果内部使用了注解 @Value 会把对应对象一起序列化,导致消息体变大。
        // 所以这里也不推荐使用 `make` 方法来创建 `Job` 对象。
        return $this->driver->push(new ExampleJob($params), $delay);
    }
}

投递消息

接下来,调用我们的 QueueService 投递消息即可。

<?php

declare(strict_types=1);

namespace App\Controller;

use App\Service\QueueService;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;

/**
 * @AutoController
 */
class QueueController extends AbstractController
{
    /**
     * @Inject
     * @var QueueService
     */
    protected $service;

    /**
     * 传统模式投递消息
     */
    public function index()
    {
        $this->service->push([
            'group@hyperf.io',
            'https://doc.hyperf.io',
            'https://www.hyperf.io',
        ]);

        return 'success';
    }
}

注解方式

框架除了传统方式投递消息,还提供了注解方式。

注解方式会在非消费环境下自动投递消息到队列,故,如果我们在队列中使用注解方式时,则不会再次投递到队列当中,而是直接在本消费进程中执行。 如果仍然需要在队列中投递消息,则可以在队列中使用传统模式投递。

让我们重写上述 QueueService,直接将 ExampleJob 的逻辑搬到 example 方法中,并加上对应注解 AsyncQueueMessage,具体代码如下。

<?php

declare(strict_types=1);

namespace App\Service;

use Hyperf\AsyncQueue\Annotation\AsyncQueueMessage;

class QueueService
{
    /**
     * @AsyncQueueMessage
     */
    public function example($params)
    {
        // 需要异步执行的代码逻辑
        // 这里的逻辑会在 ConsumerProcess 进程中执行
        var_dump($params);
    }
}

投递消息

注解模式投递消息就跟平常调用方法一致,代码如下。

<?php

declare(strict_types=1);

namespace App\Controller;

use App\Service\QueueService;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;

/**
 * @AutoController
 */
class QueueController extends AbstractController
{
    /**
     * @Inject
     * @var QueueService
     */
    protected $service;

    /**
     * 注解模式投递消息
     */
    public function example()
    {
        $this->service->example([
            'group@hyperf.io',
            'https://doc.hyperf.io',
            'https://www.hyperf.io',
        ]);

        return 'success';
    }
}

事件

事件名称触发时机备注
BeforeHandle处理消息前触发
AfterHandle处理消息后触发
FailedHandle处理消息失败后触发
RetryHandle重试处理消息前触发
QueueLength每处理 500 个消息后触发用户可以监听此事件,判断失败或超时队列是否有消息积压

QueueLengthListener

框架自带了一个记录队列长度的监听器,默认不开启,您如果需要,可以自行添加到 listeners 配置中。

<?php

declare(strict_types=1);

return [
    Hyperf\AsyncQueue\Listener\QueueLengthListener::class
];

ReloadChannelListener

当消息执行超时,或项目重启导致消息执行被中断,最终都会被移动到 timeout 队列中,只要您可以保证消息执行是幂等的(同一个消息执行一次,或执行多次,最终表现一致), 就可以开启以下监听器,框架会自动将 timeout 队列中消息移动到 waiting 队列中,等待下次消费。

监听器监听 QueueLength 事件,默认执行 500 次消息后触发一次。

<?php

declare(strict_types=1);

return [
    Hyperf\AsyncQueue\Listener\ReloadChannelListener::class
];

任务执行流转流程

任务执行流转流程主要包括以下几个队列:

队列名备注
waiting等待消费的队列
reserved正在消费的队列
delayed延迟消费的队列
failed消费失败的队列
timeout消费超时的队列 (虽然超时,但可能执行成功)

队列流转顺序如下:

graph LR;
A[投递延时消息]-->C[delayed队列];
B[投递消息]-->D[waiting队列];
C--到期-->D;
D--消费-->E[reserved队列];
E--成功-->F[删除消息];
E--失败-->G[failed队列];
E--超时-->H[timeout队列];

配置多个异步队列

当您需要使用多个队列来区分消费高频和低频或其他种类的消息时,可以配置多个队列。

  1. 添加配置
<?php

return [
    'default' => [
        'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
        'channel' => '{queue}',
        'timeout' => 2,
        'retry_seconds' => 5,
        'handle_timeout' => 10,
        'processes' => 1,
        'concurrent' => [
            'limit' => 2,
        ],
    ],
    'other' => [
        'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
        'channel' => '{other.queue}',
        'timeout' => 2,
        'retry_seconds' => 5,
        'handle_timeout' => 10,
        'processes' => 1,
        'concurrent' => [
            'limit' => 2,
        ],
    ],
];
  1. 添加消费进程
<?php

declare(strict_types=1);

namespace App\Process;

use Hyperf\AsyncQueue\Process\ConsumerProcess;
use Hyperf\Process\Annotation\Process;

/**
 * @Process()
 */
class OtherConsumerProcess extends ConsumerProcess
{
    /**
     * @var string
     */
    protected $queue = 'other';
}
  1. 调用
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\Utils\ApplicationContext;

$driver = ApplicationContext::getContainer()->get(DriverFactory::class)->get('other');
return $driver->push(new ExampleJob());

安全关闭

异步队列在终止时,如果正在进行消费逻辑,可能会导致出现错误。框架提供了 DriverStopHandler ,可以让异步队列进程安全关闭。

当前信号处理器并不适配于 CoroutineServer,如有需要请自行实现

安装信号处理器

composer require hyperf/signal

添加配置

<?php

declare(strict_types=1);

return [
    'handlers' => [
        Hyperf\AsyncQueue\Signal\DriverStopHandler::class,
    ],
    'timeout' => 5.0,
];

异步驱动之间的区别

  • Hyperf\AsyncQueue\Driver\RedisDriver::class

此异步驱动会将整个 JOB 进行序列化,当投递即时队列后,会 lpushlist 结构中,投递延时队列,会 zaddzset 结构中。 所以,如果 Job 的参数完全一致的情况,在延时队列中就会出现后投递的消息 覆盖 前面投递的消息的问题。 如果不想出现延时消息覆盖的情况,只需要在 Job 里增加一个唯一的 uniqid,或者在使用 注解 的方法上增加一个 uniqid 的入参即可。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文