使用 RQ
RQ 是一个标准的 Python 三方软件包,用 pip
安装:
(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt
正如我前面提到的,应用和 RQ worker 之间的通信将在 Redis 消息队列中执行,因此你需要运行 Redis 服务器。 有许多途径来安装和运行 Redis 服务器,比如下载其源码并执行编译和安装。 如果你使用的是 Windows,Microsoft 在 此处 维护了 Redis 的安装程序。 在 Linux 上,你可以通过操作系统的软件包管理器安装 Redis。 Mac OS X 用户可以运行 brew install redis
,然后使用 redis-server
命令手动启动服务。
除了确保服务正在运行并可供 RQ 访问之外,你不需要与 Redis 进行其他交互。
创建任务
通过 RQ 执行一项简单的任务后,你就会很快熟悉它。 一个任务,不过是一个 Python 函数而已。 以下是一个示例任务,我将其放入一个新的 app/tasks.py 模块:
app/tasks.py :示例后台任务。
import time
def example(seconds):
print('Starting task')
for i in range(seconds):
print(i)
time.sleep(1)
print('Task completed')
该任务将秒数作为参数,然后在该时间量内等待,并每秒打印一次计数器。
运行 RQ Worker
任务准备就绪,可以通过 rq worker
来启动一个 worker 进程了:
(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...
Worker 进程现在连接到了 Redis,并在名为 microblog-tasks
的队列上查看可能分配给它的任何作业。 如果你想启动多个 worker 来扩展吞吐量,你只需要运行 rq worker
来生成更多连接到同一个队列的进程。 然后,当作业出现在队列中时,任何可用的 worker 进程都可以获取它。 在生产环境中,你可能希望至少运行可用 CPU 数量的 worker。
执行任务
现在打开第二个终端窗口并激活虚拟环境。 我将使用 shell 会话来启动 worker 中的 example()
任务:
>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'
来自 RQ 的 Queue
类表示从应用程序端看到的任务队列。 它采用的参数是队列名称和一个 Redis
连接对象,本处使用默认 URL 进行初始化。 如果你的 Redis 服务器运行在不同的主机或端口号上,则需要使用其他 URL。
Queue 的 enqueue()
方法用于将作业添加到队列中。 第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。 我发现传入字符串更加方便,因为不需要在应用程序的一端导入函数。 对 enqueue()
传入的任何剩余参数将被传递给 worker 中运行的函数。
只要进行了 enqueue()
调用,运行着 RQ worker 的终端窗口上就会出现一些活动。 你会看到 example()
函数正在运行,并且每秒打印一次计数器。 同时,你的其他终端不会被阻塞,你可以继续在 shell 中执行表达式。在上面的例子中,我调用 job.get_id()
方法来获取分配给任务的唯一标识符。 你可以尝试使用另一个有趣表达式来检查 worker 上的函数是否已完成:
>>> job.is_finished
False
如果你像我在上面的例子中那样传递了 23
,那么函数将运行约 23 秒。 在那之后, job.is_finished
表达式将变为 True
。 就是这么简单,炫酷否?
一旦函数完成,worker 又回到等待作业的状态,所以如果你想进行更多的实验,你可以用不同的参数重复执行 enqueue()
调用。 队列中存储的有关任务的数据将保留一段时间(默认为 500 秒),但最终会被删除。 这很重要,任务队列不保留已执行作业的历史记录。
报告任务进度
上面使用的示例任务简单得不现实。 通常,对于长时间运行的任务,你需要将一些进度信息提供给应用程序,从而可以将其显示给用户。 RQ 通过使用作业对象的 meta
属性来支持这一点。 让我重写 example()
任务来编写进度报告:
app/tasks.py ::带进度的示例后台任务。
import time
from rq import get_current_job
def example(seconds):
job = get_current_job()
print('Starting task')
for i in range(seconds):
job.meta['progress'] = 100.0 * i / seconds
job.save_meta()
print(i)
time.sleep(1)
job.meta['progress'] = 100
job.save_meta()
print('Task completed')
这个新版本的 example()
使用 RQ 的 get_current_job()
函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。 作业对象的 meta
属性是一个字典,任务可以编写任何想要与应用程序通信的自定义数据。 在这个例子中,我写入了 progress
,表示完成任务的百分比。 每次进程更新时,我都调用 job.save_meta()
指示 RQ 将数据写入 Redis,应用程序可以在其中找到它。
在应用程序方面(目前只是一个 Python shell),我可以运行此任务,然后监视进度,如下所示:
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True
如你所见,在另一侧, meta
属性可以被读取。 需要调用 refresh()
方法来从 Redis 更新内容。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论