向客户端发送通知
现在剩下的就是增加一种机制,通过这种机制,客户端可以定期接收有关用户拥有的未读消息数量的更新。 当更新发生时,客户端将调用 set_message_count()
函数来使用户知道更新。
实际上有两种方法可以让服务器将这些更新告知客户端,而且你可能会猜到,这两种方法都有优点和缺点,因此选择哪种方法很大程度上取决于项目。 在第一种方法中,客户端通过发送异步请求定期向服务器请求更新。 来自此请求的响应是更新列表,客户端可以使用这些更新来更新页面的不同元素,例如未读消息计数标记。 第二种方法需要客户端和服务器之间的特殊连接类型,以允许服务器自由地将数据推送到客户端。 请注意,无论采用哪种方法,我都希望将通知视为通用实体,以便我可以扩展此框架以支持除未读消息徽章以外的其他类型的事件。
第一种解决方案最大的优点是易于实施。 我需要做的只是向应用程序添加另一条路由,例如 /notifications ,它返回 JSON 格式的通知列表。然后客户端应用程序遍历通知列表并将必要的更改应用于页面。 该解决方案的缺点是实际事件和通知之间会有延迟,因为客户端会定期请求通知列表。 例如,如果客户端每 10 秒钟询问一次通知,则可能延迟 10 秒接收通知。
第二个解决方案需要在协议级别进行更改,因为 HTTP 没有服务器主动向客户端发送数据的任何规定。到目前为止,实现服务器推送消息的最常见方式是扩展服务器以支持除 HTTP 之外的 WebSocket 连接。 WebSocket 是一种不同于 HTTP 的协议,在服务器和客户端之间建立永久连接。服务器和客户端可以随时向对方发送数据,而无需另一方请求。这种机制的优点是,无论何时发生客户感兴趣的事件,服务器都可以发送通知,而不会有任何延迟。缺点是 WebSocket 需要比 HTTP 更复杂的设置,因为服务器需要与每个客户端保持永久连接。想象一下,例如有四个 worker 进程的服务器通常可以服务几百个 HTTP 客户端,因为 HTTP 中的连接是短暂的并且不断被回收。而相同的服务器只能处理四个 WebSocket 客户端,在绝大多数情况下,这会导致资源紧张。正是由于这种限制,WebSocket 应用程序通常围绕 异步服务器 进行设计,因为这种服务器在管理大量 worker 和活动连接方面效率更高。
好消息是,不管你使用什么方法,在客户端你都会有一个回调函数,它将被更新列表调用。 因此,我可以从第一个解决方案开始,该解决方案实施起来要容易得多,如果发现不足,可以迁移到 WebSocket 服务器,该服务器可以配置为调用相同的客户端回调。 在我看来,对于这种类型的应用,第一种解决方案实际上是可以接受的。 基于 WebSocket 的实现对于需要以接近零延迟传递更新的应用程序非常有用。
这里有一些业界的类似案例。Twitter 也使用的是第一种导航栏通知的方法;Facebook 使用称为 长轮询 的 HTTP 变体,它解决了直接轮询的一些限制,同时仍然使用 HTTP 请求;Stack Overflow 和 Trello 这两个站点使用 WebSocket 来实现通知机制。 你可以通过查看浏览器调试器的“Network”选项卡来查找任何网站上发生的后台活动请求。
我们继续实施轮询解决方案。 首先,我要添加一个新模型来跟踪所有用户的通知,以及用户模型中的关系。
app/models.py :通知模型。
import json
from time import time
# ...
class User(UserMixin, db.Model):
# ...
notifications = db.relationship('Notification', backref='user',
lazy='dynamic')
# ...
class Notification(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), index=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
timestamp = db.Column(db.Float, index=True, default=time)
payload_json = db.Column(db.Text)
def get_data(self):
return json.loads(str(self.payload_json))
通知将会有一个名称,一个关联的用户,一个 Unix 时间戳和一个有效载荷。 时间戳默认从 time.time()
函数中获取。 每种类型的通知都会有所不同,所以我将它写为 JSON 字符串,因为这样可以编写列表,字典或单个值(如数字或字符串)。 为了方便,我添加了 get_data()
方法,以便调用者不必操心 JSON 的反序列化。
这些更改需要包含在新的数据库迁移中:
(venv) $ flask db migrate -m "notifications"
(venv) $ flask db upgrade
为了方便,我将新增的 Message
和 Notification
模型添加到 shell 上下文,这样我就可以直接在用 flask shell
命令启动的解释器中使用这两个模型了。
microblog.py : 添加 Message 和 Notification 模型到 shell 上下文。
# ...
from app.models import User, Post, Notification, Message
# ...
@app.shell_context_processor
def make_shell_context():
return {'db': db, 'User': User, 'Post': Post, 'Message': Message
'Notification': Notification}
我还将在用户模型中添加一个 add_notification()
辅助方法,以便更轻松地处理这些对象:
app/models.py :Notification 模型。
class User(UserMixin, db.Model):
# ...
def add_notification(self, name, data):
self.notifications.filter_by(name=name).delete()
n = Notification(name=name, payload_json=json.dumps(data), user=self)
db.session.add(n)
return n
此方法不仅为用户添加通知给数据库,还确保如果具有相同名称的通知已存在,则会首先删除该通知。 我将要使用的通知将被称为 unread_message_count
。 如果数据库已经有一个带有这个名称的通知,例如值为 3,则当用户收到新消息并且消息计数变为 4 时,我就会替换旧的通知。
在任何未读消息数改变的地方,我需要调用 add_notification()
,以便我更新用户的通知,这样的地方有两处。 首先,在 send_message()
视图函数中,当用户收到一个新的私有消息时:
app/main/routes.py :更新用户通知。
@bp.route('/send_message/<recipient>', methods=['GET', 'POST'])
@login_required
def send_message(recipient):
# ...
if form.validate_on_submit():
# ...
user.add_notification('unread_message_count', user.new_messages())
db.session.commit()
# ...
# ...
第二个地方是用户转到消息页面时,未读计数需要归零:
app/main/routes.py :查看消息视图函数。
@bp.route('/messages')
@login_required
def messages():
current_user.last_message_read_time = datetime.utcnow()
current_user.add_notification('unread_message_count', 0)
db.session.commit()
# ...
既然用户的所有通知都保存在数据库中,那么我可以添加一条新路由,客户端可以使用该路由为登录用户检索通知:
app/main/routes.py :通知视图函数。
from app.models import Notification
# ...
@bp.route('/notifications')
@login_required
def notifications():
since = request.args.get('since', 0.0, type=float)
notifications = current_user.notifications.filter(
Notification.timestamp > since).order_by(Notification.timestamp.asc())
return jsonify([{
'name': n.name,
'data': n.get_data(),
'timestamp': n.timestamp
} for n in notifications])
这是一个相当简单的函数,它返回一个包含用户通知列表的 JSON 负载。 每个通知都以包含三个元素的字典的形式给出,即通知名称,与通知有关的附加数据(如消息数量)和时间戳。 通知按照从创建时间顺序进行排序。
我不希望客户重复发送通知,所以我给他们提供了一个选项,只请求给定时间戳之后产生的通知。 since
选项可以作为浮点数包含在请求 URL 的查询字符串中,其中包含开始时间的 unix 时间戳。 如果包含此参数,则只有在此时间之后发生的通知才会被返回。
完成此功能的最后一部分是在客户端实现实际轮询。 最好的做法是在基础模板中实现,以便所有页面自动继承该行为:
app/templates/base.html :轮询通知。
...
{% block scripts %}
<script>
// ...
{% if current_user.is_authenticated %}
$(function() {
var since = 0;
setInterval(function() {
$.ajax('{{ url_for('main.notifications') }}?since=' + since).done(
function(notifications) {
for (var i = 0; i < notifications.length; i++) {
if (notifications[i].name == 'unread_message_count')
set_message_count(notifications[i].data);
since = notifications[i].timestamp;
}
}
);
}, 10000);
});
{% endif %}
</script>
该函数包含在一个模板条件中,因为我只想在用户登录时轮询新消息。对于没有登录的用户,这个函数将不会被渲染。
你已经在 第二十章 中看到了 jQuery 的 $(function() { ...})
模式。 这是注册一个函数在页面加载后执行的方式。 对于这个功能,我需要在页面加载时做的是设置一个定时器来获取用户的通知。 你还看到了 setTimeout()
JavaScript 函数,它在等待特定时间之后运行作为参数给出的函数。 setInterval()
函数使用与 setTimeout()
相同的参数,但不是一次性触发定时器,而是定期调用回调函数。 本处,我的间隔设置为 10 秒(以毫秒为单位),所以我将以每分钟大约六次的频率查看通知是否有更新。
利用定期计时器和 Ajax,该函数轮询新通知路由,并在其完成回调中迭代通知列表。 当收到名为 unread_message_count
的通知时,通过调用上面定义的函数和通知中给出的计数来调整消息计数徽章。
我处理 since
参数的方式可能会令人困惑。 我首先将这个参数初始化为 0。 参数总是包含在请求 URL 中,但是我不能像以前那样使用 Flask 的 url_for()
来生成查询字符串,因为一次请求中 url_for()
只在服务器上运行一次,而我需要 since
参数动态更新多次。 第一次,这个请求将被发送到 /notifications?since=0 ,但是一旦我收到通知,我就会将 since
更新为它的时间戳。 这可以确保我不会收到重复的内容,因为我总是要求收到自我上次看到的通知以来发生的新通知。 同样重要的是要注意,我在 interval 函数外声明 since
变量,因为我不希望它是局部变量,我想要在所有调用中使用相同的变量。
最简单的测试方法是使用两种不同的浏览器 A 和 B。 在两个浏览器上使用不同的用户登录 Microblog。 然后从 A 浏览器向 B 浏览器上的用户发送一个或多个消息。 B 浏览器的导航栏应更新为显示你在 10 秒钟内发送的消息数量。 而当你点击消息链接时,未读消息数重置为零。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论