Django Channels 和 Celery 示例

发布于 2024-08-10 13:44:28 字数 6761 浏览 14 评论 0

在本教程中,我将探讨如何建立一个 Django Channels 项目来与 Celery 协同工作,以及在任务开始和结束的时候,即时通知。Django Channels 使用 WebSockets 来启用服务器和浏览器客户端之间的双向通信。假设读者已经熟悉如何建立一个普通的 Django 项目,我们将只覆盖 Channels 和 Celery 相关的部分。

你可以在 这里找到 Github repo 以及位于 http://tasker.vincenttide.com 的一个相同部署。注意,该部署包含了在此教程不包括的一些额外的内容,例如取消功能。示例部署的前端还运行 React 库,而我们在这个演示将只使用 JavaScript。

首先,让我们安装一些需要的依赖。我们将需要一个 Channels 层后端,Channels 用来传递和存储数据。我们还将需要一个 Celery broker 传输后端。事实证明,我们可以为这些任务同时使用 Redis,因此 Redis 就是我们所用的。

# Add Chris Lea’s redis ppa - he maintains the ppa for many open source projects
$ sudo add-apt-repository ppa:chris-lea/redis-server
$ sudo apt-get update
$ sudo apt-get install redis-server

# Now check that redis-server is up and running
$ redis-cli ping
# PONG

在一个 virtualenv 内设置一个新的 Django 项目,然后安装以下库:

$ pip install django
$ pip install channels  # the channels library
$ pip install asgi_redis  # the redis channel layer we are using
$ pip install celery  # Celery task queue

让我们先看看 settings.py 文件。

# Add our new app to installed apps
INSTALLED_APPS = [
#…
  ‘jobs’,
]

# Channels settings
CHANNEL_LAYERS = {
   "default": {
       "BACKEND": "asgi_redis.RedisChannelLayer",  # use redis backend
       "CONFIG": {
           "hosts": [os.environ.get('REDIS_URL', 'redis://localhost:6379')],  # set redis address
       },
       "ROUTING": "django_channels_celery_tutorial.routing.channel_routing",  # load routing from our routing.py file
   },
}

# Celery settings
BROKER_URL = 'redis://localhost:6379/0'  # our redis address
# use json format for everything
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

首先,添加我们的新 app 到 INSTALLED_APPS 列表中。Channels 设置只是告诉 Channels 我们所使用的后端,在这里,是 Redis。ROUTING 选项告诉 Channels 到哪里找我们的 WebSockets 路径,这里是 routing.py 文件。Celery 设置告诉 Celery 到哪里找我们的 broker,以及对所有东西,我们想要使用 json 格式。

现在,看看 routing.py 文件:

from channels import route
from jobs import consumers

channel_routing = [
   # Wire up websocket channels to our consumers:
   route("websocket.connect", consumers.ws_connect),
   route("websocket.receive", consumers.ws_receive),
]

这里,我们只是为连接和接收消息设置了处理函数。我们还可以添加一个函数来处理断开连接消息,但对我们来说,并不需要。我们告诉 Channels 在我们的 jobs/consumers.py 文件中查找函数。

这里是 consumers.py 文件的主要部分:

@channel_session
def ws_connect(message):
   message.reply_channel.send({
       "text": json.dumps({
           "action": "reply_channel",
           "reply_channel": message.reply_channel.name,
       })
   })


@channel_session
def ws_receive(message):
   try:
       data = json.loads(message['text'])
   except ValueError:
       log.debug("ws message isn't json text=%s", message['text'])
       return

   if data:
       reply_channel = message.reply_channel.name

       if data['action'] == "start_sec3":
           start_sec3(data, reply_channel)

在我们的 ws_connect 函数中,我们将只是把客户端的回复通道(reply channel)地址回传。回复通道是一个分配给每一个连接到我们的 websockets 服务器的浏览器客户端的唯一地址。可以从 message.reply_channel.name 中检索到该值,并且可以保存或传递该值到另一个函数,例如 Celery 的 task,这样的话,就可以将消息发回去。事实上,这就是我们将要做的事。message.reply_channel.send 是 Channels 为我们提供的,用来回复到同一个客户端的一个方便的快捷方式。如果你仅有 reply_channel 名,那么你将必须使用以下方法来发送消息:

Channel(reply_channel_name).send({
   "text": json.dumps({
       "action": "started",
       "job_id": job.id,
       "job_name": job.name,
       "job_status": job.status,
   })
})

在我们的 ws_receive 函数中,我们根据 action 参数来看看客户端想要我们做什么。如果你想要做不同的事,那么可以有多个 action 指令。在我们的例子中,我们只有一个指令,它运行一个名为 start_sec3 的函数。start_sec3 只是休眠 3 秒,然后回复它已经完成的消息给客户端。注意,我们传递了 reply_channel 地址,因此它知道发送响应到哪。

最后一个重要的块是客户端侧的 javascript 处理函数。

$(function() {
   // When we're using HTTPS, use WSS too.
   var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
   var ws_path = ws_scheme + '://' + window.location.host + '/dashboard/';
   console.log("Connecting to " + ws_path)
   var socket = new ReconnectingWebSocket(ws_path);

   socket.onmessage = function(message) {
       console.log("Got message: " + message.data);
       var data = JSON.parse(message.data);

       // if action is started, add new item to table
       if (data.action == "started") {
           var task_status = $("#task_status");
           var ele = $('<tr></tr>');
           ele.attr("id", data.job_id);
           var item_id = $("<td></td>").text(data.job_id);
           ele.append(item_id);
           var item_name = $("<td></td>").text(data.job_name);
           ele.append(item_name);
           var item_status = $("<td></td>");
           item_status.attr("id", "item-status-"+data.job_id);
           var span = $('<span class="label label-primary"></span>').text(data.job_status);
           item_status.append(span);
           ele.append(item_status);
           task_status.append(ele);
       }
       // if action is completed, just update the status
       else if (data.action == "completed"){
           var item = $('#item-status-' + data.job_id + ' span');
           item.attr("class", "label label-success");
           item.text(data.job_status);
       }
   };

   $("#taskform").on("submit", function(event) {
       var message = {
           action: "start_sec3",
           job_name: $('#task_name').val()
       };
       socket.send(JSON.stringify(message));
       $("#task_name").val('').focus();
       return false;
   });
});

这里,我们首先创建 websockets 对象,然后用 socket.onmessage 函数来处理为每个 websockets 消息我们应该做的事。如果 action 参数的值是“started”,那么我们将添加一个新的条目到表格中。如果 action 是 completed,我们只会修改对应的列状态为已完成。

而表单则是发送一个 websockets 消息到服务器,告诉它运行 action “start_sec3”。

要看完整的项目文件,请访问 Github repo 。要运行 Github repo 代码,先确保你安装了 Redis,然后运行以下命令:

pip install -r requirements.txt
python manage.py makemigrations
python manage.py migrate
python manage.py runserver  # Start daphne and workers
celery worker -A example -l info  # Start celery workers

这会启动部署服务器,地址为 http://localhost:8000 。再一次说明,你可以在 http://tasker.vincenttide.com 上找到一个类似的部署。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

回心转意

暂无简介

0 文章
0 评论
24 人气
更多

推荐作者

Gabu-gabumon

文章 0 评论 0

qq_CgiN62

文章 0 评论 0

荔枝明

文章 0 评论 0

¤→小豸慧

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文