返回介绍

Task 机制

发布于 2021-04-03 03:37:35 字数 9903 浏览 1606 评论 0 收藏 0

现阶段 Swoole 暂时没有办法 hook 所有的阻塞函数,也就意味着有些函数仍然会导致 进程阻塞,从而影响协程的调度,此时我们可以通过使用 Task 组件来模拟协程处理,从而达到不阻塞进程调用阻塞函数的目的,本质上是仍是是多进程运行阻塞函数,所以性能上会明显地不如原生协程,具体取决于 Task Worker 的数量。

安装

composer require hyperf/task

配置

因为 Task 并不是默认组件,所以在使用的时候需要在 server.php 增加 Task 相关的配置。

<?php

declare(strict_types=1);

use Hyperf\Server\Event;

return [
    // 这里省略了其它不相关的配置项
    'settings' => [
        // Task Worker 数量,根据您的服务器配置而配置适当的数量
        'task_worker_num' => 8,
        // 因为 `Task` 主要处理无法协程化的方法,所以这里推荐设为 `false`,避免协程下出现数据混淆的情况
        'task_enable_coroutine' => false,
    ],
    'callbacks' => [
        // Task callbacks
        Event::ON_TASK => [Hyperf\Framework\Bootstrap\TaskCallback::class, 'onTask'],
        Event::ON_FINISH => [Hyperf\Framework\Bootstrap\FinishCallback::class, 'onFinish'],
    ],
];

使用

Task 组件提供了 主动方法投递注解投递 两种使用方法。

主动方法投递

<?php

use Hyperf\Utils\Coroutine;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Task\TaskExecutor;
use Hyperf\Task\Task;

class MethodTask
{
    public function handle($cid)
    {
        return [
            'worker.cid' => $cid,
            // task_enable_coroutine 为 false 时返回 -1,反之 返回对应的协程 ID
            'task.cid' => Coroutine::id(),
        ];
    }
}

$container = ApplicationContext::getContainer();
$exec = $container->get(TaskExecutor::class);
$result = $exec->execute(new Task([MethodTask::class, 'handle'], [Coroutine::id()]));

使用注解

通过 主动方法投递 时,并不是特别直观,这里我们实现了对应的 @Task 注解,并通过 AOP 重写了方法调用。当在 Worker 进程时,自动投递到 Task 进程,并协程等待 数据返回。

<?php

use Hyperf\Utils\Coroutine;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Task\Annotation\Task;

class AnnotationTask
{
    /**
     * @Task
     */
    public function handle($cid)
    {
        return [
            'worker.cid' => $cid,
            // task_enable_coroutine=false 时返回 -1,反之 返回对应的协程 ID
            'task.cid' => Coroutine::id(),
        ];
    }
}

$container = ApplicationContext::getContainer();
$task = $container->get(AnnotationTask::class);
$result = $task->handle(Coroutine::id());

使用 @Task 注解时需 use Hyperf\Task\Annotation\Task;

附录

Swoole 暂时没有协程化的函数列表

  • mysql,底层使用 libmysqlclient, 不推荐使用, 推荐使用已经实现协程化的 pdo_mysql/mysqli
  • curl,底层使用 libcurl,在 Swoole 4.4 后底层进行了协程化(beta)
  • mongo,底层使用 mongo-c-client
  • pdo_pgsql
  • pdo_ori
  • pdo_odbc
  • pdo_firebird

MongoDB

因为 MongoDB 没有办法被 hook,所以我们可以通过 Task 来调用,下面就简单介绍一下如何通过注解方式调用 MongoDB

以下我们实现两个方法 insertquery,其中需要注意的是 manager 方法不能使用 Task, 因为 Task 会在对应的 Task 进程 中处理,然后将数据从 Task 进程 返回到 Worker 进程 。 所以 Task 方法 的入参和出参最好不要携带任何 IO,比如返回一个实例化后的 Redis 等等。

<?php

declare(strict_types=1);

namespace App\Task;

use Hyperf\Task\Annotation\Task;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Manager;
use MongoDB\Driver\Query;
use MongoDB\Driver\WriteConcern;

class MongoTask
{
    /**
     * @var Manager
     */
    public $manager;

    /**
     * @Task
     */
    public function insert(string $namespace, array $document)
    {
        $writeConcern = new WriteConcern(WriteConcern::MAJORITY, 1000);
        $bulk = new BulkWrite();
        $bulk->insert($document);

        $result = $this->manager()->executeBulkWrite($namespace, $bulk, $writeConcern);
        return $result->getUpsertedCount();
    }

    /**
     * @Task
     */
    public function query(string $namespace, array $filter = [], array $options = [])
    {
        $query = new Query($filter, $options);
        $cursor = $this->manager()->executeQuery($namespace, $query);
        return $cursor->toArray();
    }

    protected function manager()
    {
        if ($this->manager instanceof Manager) {
            return $this->manager;
        }
        $uri = 'mongodb://127.0.0.1:27017';
        return $this->manager = new Manager($uri, []);
    }
}

使用如下

<?php
use App\Task\MongoTask;
use Hyperf\Utils\ApplicationContext;

$client = ApplicationContext::getContainer()->get(MongoTask::class);
$client->insert('hyperf.test', ['id' => rand(0, 99999999)]);

$result = $client->query('hyperf.test', [], [
    'sort' => ['id' => -1],
    'limit' => 5,
]);

其他方案

如果 Task 机制无法满足性能要求,可以尝试一下 Hyperf 组织下的另一个开源项目GoTask。GoTask 通过 Swoole 进程管理功能启动 Go 进程作为 Swoole 主进程边车(Sidecar),利用进程通讯将任务投递给边车处理并接收返回值。可以理解为 Go 版的 Swoole TaskWorker。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文