返回介绍

使用 RQ

发布于 2025-01-02 21:54:01 字数 4274 浏览 0 评论 0 收藏 0

RQ 是一个标准的 Python 三方软件包,用 pip 安装:

(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt

正如我前面提到的,应用和 RQ worker 之间的通信将在 Redis 消息队列中执行,因此你需要运行 Redis 服务器。 有许多途径来安装和运行 Redis 服务器,比如下载其源码并执行编译和安装。 如果你使用的是 Windows,Microsoft 在 此处 维护了 Redis 的安装程序。 在 Linux 上,你可以通过操作系统的软件包管理器安装 Redis。 Mac OS X 用户可以运行 brew install redis ,然后使用 redis-server 命令手动启动服务。

除了确保服务正在运行并可供 RQ 访问之外,你不需要与 Redis 进行其他交互。

创建任务

通过 RQ 执行一项简单的任务后,你就会很快熟悉它。 一个任务,不过是一个 Python 函数而已。 以下是一个示例任务,我将其放入一个新的 app/tasks.py 模块:

app/tasks.py :示例后台任务。

import time

def example(seconds):
    print('Starting task')
    for i in range(seconds):
        print(i)
        time.sleep(1)
    print('Task completed')

该任务将秒数作为参数,然后在该时间量内等待,并每秒打印一次计数器。

运行 RQ Worker

任务准备就绪,可以通过 rq worker 来启动一个 worker 进程了:

(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...

Worker 进程现在连接到了 Redis,并在名为 microblog-tasks 的队列上查看可能分配给它的任何作业。 如果你想启动多个 worker 来扩展吞吐量,你只需要运行 rq worker 来生成更多连接到同一个队列的进程。 然后,当作业出现在队列中时,任何可用的 worker 进程都可以获取它。 在生产环境中,你可能希望至少运行可用 CPU 数量的 worker。

执行任务

现在打开第二个终端窗口并激活虚拟环境。 我将使用 shell 会话来启动 worker 中的 example() 任务:

>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'

来自 RQ 的 Queue 类表示从应用程序端看到的任务队列。 它采用的参数是队列名称和一个 Redis 连接对象,本处使用默认 URL 进行初始化。 如果你的 Redis 服务器运行在不同的主机或端口号上,则需要使用其他 URL。

Queue 的 enqueue() 方法用于将作业添加到队列中。 第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。 我发现传入字符串更加方便,因为不需要在应用程序的一端导入函数。 对 enqueue() 传入的任何剩余参数将被传递给 worker 中运行的函数。

只要进行了 enqueue() 调用,运行着 RQ worker 的终端窗口上就会出现一些活动。 你会看到 example() 函数正在运行,并且每秒打印一次计数器。 同时,你的其他终端不会被阻塞,你可以继续在 shell 中执行表达式。在上面的例子中,我调用 job.get_id() 方法来获取分配给任务的唯一标识符。 你可以尝试使用另一个有趣表达式来检查 worker 上的函数是否已完成:

>>> job.is_finished
False

如果你像我在上面的例子中那样传递了 23 ,那么函数将运行约 23 秒。 在那之后, job.is_finished 表达式将变为 True 。 就是这么简单,炫酷否?

一旦函数完成,worker 又回到等待作业的状态,所以如果你想进行更多的实验,你可以用不同的参数重复执行 enqueue() 调用。 队列中存储的有关任务的数据将保留一段时间(默认为 500 秒),但最终会被删除。 这很重要,任务队列不保留已执行作业的历史记录。

报告任务进度

上面使用的示例任务简单得不现实。 通常,对于长时间运行的任务,你需要将一些进度信息提供给应用程序,从而可以将其显示给用户。 RQ 通过使用作业对象的 meta 属性来支持这一点。 让我重写 example() 任务来编写进度报告:

app/tasks.py ::带进度的示例后台任务。

import time
from rq import get_current_job

def example(seconds):
    job = get_current_job()
    print('Starting task')
    for i in range(seconds):
        job.meta['progress'] = 100.0 * i / seconds
        job.save_meta()
        print(i)
        time.sleep(1)
    job.meta['progress'] = 100
    job.save_meta()
    print('Task completed')

这个新版本的 example() 使用 RQ 的 get_current_job() 函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。 作业对象的 meta 属性是一个字典,任务可以编写任何想要与应用程序通信的自定义数据。 在这个例子中,我写入了 progress ,表示完成任务的百分比。 每次进程更新时,我都调用 job.save_meta() 指示 RQ 将数据写入 Redis,应用程序可以在其中找到它。

在应用程序方面(目前只是一个 Python shell),我可以运行此任务,然后监视进度,如下所示:

>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True

如你所见,在另一侧, meta 属性可以被读取。 需要调用 refresh() 方法来从 Redis 更新内容。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文