将 RQ 与 Flask 应用集成
Redis 服务的连接 URL 需要添加到配置中:
class Config(object):
# ...
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
与往常一样,Redis 连接 URL 将来自环境变量,如果该变量未定义,则会假定该服务在当前主机的默认端口上运行并使用默认 URL。
应用工厂函数将负责初始化 Redis 和 RQ:
app/ init .py :整合 RQ。
# ...
from redis import Redis
import rq
# ...
def create_app(config_class=Config):
# ...
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
# ...
app.task_queue
将成为提交任务的队列。 将队列附加到应用上会提供很大的便利,因为我可以在应用的任何地方使用 current_app.task_queue
来访问它。 为了方便应用的任何部分提交或检查任务,我可以在 User
模型中创建一些辅助方法:
app/models.py :用户模型中的任务辅助方法。
# ...
class User(UserMixin, db.Model):
# ...
def launch_task(self, name, description, *args, **kwargs):
rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
*args, **kwargs)
task = Task(id=rq_job.get_id(), name=name, description=description,
user=self)
db.session.add(task)
return task
def get_tasks_in_progress(self):
return Task.query.filter_by(user=self, complete=False).all()
def get_task_in_progress(self, name):
return Task.query.filter_by(name=name, user=self,
complete=False).first()
launch_task()
方法负责将任务提交到 RQ 队列,并将其添加到数据库中。 name
参数是函数名称,如 app/tasks.py 中所定义的那样。 提交给 RQ 时,该函数会将 app.tasks.
预先添加到该名称中以构建符合规范的函数名称。 description
参数是对呈现给用户的任务的友好描述。 对于导出用户动态的函数,我将名称设置为 export_posts
,将描述设置为 Exporting posts...
。 其余参数将传递给任务函数。 launch_task()
函数首先调用队列的 enqueue()
方法来提交作业。 返回的作业对象包含由 RQ 分配的任务 ID,因此我可以使用它在我的数据库中创建相应的 Task
对象。
请注意, launch_task()
将新的任务对象添加到会话中,但不会发出提交。 一般来说,最好在更高层次函数中的数据库会话上进行操作,因为它允许你在单个事务中组合由较低级别函数所做的多个更新。 这不是一个严格的规则,并且,在本章后面的子函数中也会存在一个例外的提交。
get_tasks_in_progress()
方法返回该用户未完成任务的列表。 稍后你会看到,我使用此方法在将有关正在运行的任务的信息渲染到用户的页面中。
最后, get_task_in_progress()
是上一个方法的简化版本并返回指定的任务。 我阻止用户同时启动两个或多个相同类型的任务,因此在启动任务之前,可以使用此方法来确定前一个任务是否还在运行。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论