返回介绍

发布/订阅模式

发布于 2025-01-25 22:50:16 字数 23955 浏览 0 评论 0 收藏 0

发布/订阅(通常缩写为 pub / sub )可能是最着名的单向消息传递模式。我们应该已经熟悉它了,因为它不过是一个分布式的观察者模式。就观察者而言,我们有一组用户注册他们对接收特定类别的消息的兴趣。另一方面,发布者产生分布在所有相关用户中的消息。下图显示了发布/订阅模式的两个主要变体,第一个是点对点,第二个使用代理来调解通信:

让 pub / sub 如此特别的是,发布者不知道邮件的收件人是谁。正如我们所说的那样,用户必须注册它的监听器才能收到特定的消息,从而允许发布者与未知数量的接收者一起工作。换句话说, pub / sub 模式的两边是松散耦合的,这使得它成为一个理想模式来集成不断发展的分布式系统的节点。

代理的存在进一步改善了系统节点之间的解耦,因为订阅者仅与代理交互,不知道哪个节点是消息发布者。正如我们稍后将看到的,代理还可以提供消息队列系统,即使在节点之间存在连接问题的情况下也可以实现可靠的传送。

现在,让我们以一个示例来演示这种模式。

构建一个简单的实时聊天应用程序

为了展示 pub / sub 模式如何帮助我们集成分布式体系结构的实例,现在我们将使用纯 WebSockets 构建一个非常基本的实时聊天应用程序。然后,我们将尝试通过运行多个实例并使用消息传递系统进行通信来扩展它。

实现服务器端

现在,让我们一次一步。 首先构建我们的聊天应用程序; 为此,我们将依赖 ws ,它是 Node.js 的纯 WebSocket 实现。我们知道,在 Node.js 中实现实时应用程序非常简单,我们的代码将证实这一假设。然后让我们创建聊天的服务器端; 其内容如下(在 app.js 文件中):

const WebSocketServer = require('ws').Server;

// 静态的文件服务器
const server = require('http').createServer( //[1]
  require('ecstatic')({
    root: `${__dirname}/www`
  })
);

const wss = new WebSocketServer({
  server: server
}); //[2]
wss.on('connection', ws => {
  console.log('Client connected');
  ws.on('message', msg => { //[3]
    console.log(`Message: ${msg}`);
    broadcast(msg);
  });
});

function broadcast(msg) { //[4]
  wss.clients.forEach(client => {
    client.send(msg);
  });
}

server.listen(process.argv[2] || 8080);

这就是我们需要在服务器上实现聊天应用程序的全部内容。这是它的工作方式:

  1. 我们首先创建一个 HTTP 服务器并附上名为 ecstatic 的中间件( https://npmjs.org/package/ecstatic )来提供静态文件。 这需要为我们的应用程序( JavaScriptCSS )的客户端资源提供服务。
  2. 我们创建一个 WebSocket 服务器的新实例,并将其附加到我们现有的 HTTP 服务器上。然后,我们通过附加连接事件的事件侦听器来开始监听传入的 WebSocket 连接。
  3. 每当新客户端连接到我们的服务器时,我们就开始监听收到的消息。当新消息到达时,我们将它广播给所有连接的客户端
  4. broadcast() 函数是对所有连接客户端进行广播, send() 函数在其中的每一个客户端上被调用。

这是 Node.js 的魔力! 当然,我们实现的服务器的功能非常少,仅仅实现了基本的功能,但正如我们将看到的,它能够工作。

实现客户端

接下来,是时候实施我们聊天的客户端了;这也是一个非常小而简单的代码片段,基本上是一个包含一些基本 JavaScript 代码的最少的 HTML 页面。让我们在一个名为 www/index.html 的文件中创建这个页面,如下所示:

<!DOCTYPE html>
<html>
  <head>
    <script>
      var ws = new WebSocket('ws://' + window.document.location.host);
      ws.onmessage = function(message) {
        var msgDiv = document.createElement('div');
        msgDiv.innerHTML = message.data;
        document.getElementById('messages').appendChild(msgDiv);
      };

      function sendMessage() {
        var message = document.getElementById('msgBox').value;
        ws.send(message);
      }
    </script>
  </head>
  <body>
    Messages:
    <div id='messages'></div>
    <input type='text' placeholder='Send a message' id='msgBox'>
    <input type='button' onclick='sendMessage()' value='Send'>
  </body>
</html>

我们创建的 HTML 页面并不需要太多解释; 它只是一个简单的 Web 页面。 我们使用本地 WebSocket 对象初始化与 Node.js 服务器的连接,然后开始监听来自服务器的消息,并在它们到达时将它们显示在新的 div 元素中。相反,我们使用简单的文本框和按钮来发送消息。

在停止或重新启动聊天服务器时, WebSocket 连接将关闭,并且不会自动重新连接(如果要实现此则需要使用高级库,例如 Socket.io )。 这意味着在服务器重新启动后重新刷新浏览器以重新建立连接(或实现重新连接机制,这里我们不会介绍)。

运行和扩展聊天应用程序

我们可以尝试立即运行应用程序; 只需使用以下命令启动服务器即可:

node app 8080

要运行这个 demo,您需要支持本机 WebSocket 的最新浏览器。这里有一个兼容的浏览器列表: http://caniuse.com/#feat=websockets

打开浏览器,访问 http://localhost:8080

我们现在要展示的是当我们尝试通过启动多个实例来扩展应用程序时发生的情况。让我们尝试这样做,让我们在另一个端口上启动另一台服务器:

node app 8081

缩放我们的聊天应用程序的理想结果应该是连接到两个不同服务器的两个客户端应该能够交换聊天消息。不幸的是,这不如我们所愿。 我们可以通过打开另一个浏览器选项卡来尝试打开 http://localhost:8081

在一个实例上发送聊天消息时,我们在本地广播一条消息,仅将其分发给连接到该特定服务器的客户端。实际上,两台服务器不会互相通话。 我们需要整合它们。

在实际的应用程序中,我们将使用负载平衡器来分配实例中的负载,但对于此演示,我们不会使用它。这使我们能够以确定性的方式访问每台服务器,以验证它与其它实例交互的方式。

使用 Redis 作为消息代理

我们通过引入 Redis 开始分析最重要的 pub / sub 实现,这是一个非常快速和灵活的键/值存储,也被许多人定义为数据结构服务器。

Redis 比消息代理更像是一个数据库;然而,在其众多功能中,有一对专门用于实现集中式发布/订阅模式的命令。 当然,与更先进的面向消息的中间件相比,这种实现非常简单和基本,但这是其受欢迎的主要原因之一。通常,实际上, Redis 已经在现有基础架构中广泛使用,例如,作为缓存服务器或会话存储;它的速度和灵活性使其成为在分布式系统中共享数据的非常流行的选择。因此,只要项目中出现对发布/订阅代理的需求,最简单直接的选择就是重用 Redis 本身,避免安装和维护专用的消息代理。让我们以一个例子来展示它的功能。

这个例子需要安装 Redis ,监听它的默认端口。你可以在这里查看: https://redis.io/topics/quickstart

我们计划使用 Redis 来作为聊天服务器的消息代理。每个实例都将从其客户端接收到的任何消息发布给代理,并同时订阅来自其他服务器实例的消息。正如我们所看到的,我们架构中的每个服务器都是订阅者和发布者。下图显示了我们想要获得的体系结构的表示形式:

通过查看上图,我们可以总结一条消息的经历如下:

  1. 将消息输入到网页的文本框中并发送到聊天服务器的连接实例。
  2. 邮件然后发布给代理。
  3. 代理将消息分派给所有订阅者,在我们的体系结构中,所有订阅者都是聊天服务器的实例。
  4. 在每种情况下,都会将消息分发给所有连接的客户端。

Redis 允许发布和订阅由字符串标识的频道,例如 chat.nodejs 。它还允许我们使用 glob 风格的模式来定义可能匹配多个频道的订阅,例如 chat.*

我们在实践中看看它是如何工作的。让我们通过添加发布/订阅逻辑来修改服务器代码:

const WebSocketServer = require('ws').Server;
const redis = require("redis");
const redisSub = redis.createClient();
const redisPub = redis.createClient();

// 静态文件服务器
const server = require('http').createServer(
  require('ecstatic')({root: `${__dirname}/www`})
);

const wss = new WebSocketServer({server: server});
wss.on('connection', ws => {
  console.log('Client connected');
  ws.on('message', msg => {
    console.log(`Message: ${msg}`);
    redisPub.publish('chat_messages', msg);
  });
});

redisSub.subscribe('chat_messages');
redisSub.on('message', (channel, msg) => {
  wss.clients.forEach((client) => {
    client.send(msg);
  });
});

server.listen(process.argv[2] || 8080);

我们对原始聊天服务器所做的更改在前面的代码中突出显示;下面来解释其工作原理:

  1. 要将我们的 Node.js 应用程序连接到 Redis 服务器,我们使用 redis ,它是一个支持所有可用 Redis 命令的完整客户端。 接下来,我们实例化两个不同的连接,一个用于订阅 channel ,另一个用于发布消息。 这在 Redis 中是必需的,因为一旦连接进入用户模式,就只能使用与订阅相关的命令。 这意味着我们需要第二个连接来发布消息。
  2. 当从连接的客户端收到新消息时,我们会在 chat_messages 通道中发布消息。我们不直接向客户广播该消息,因为我们所有的服务器订阅了同一个 channel (我们稍后会看到),所以它会通过 Redis 返回给我们。 对于这个例子的范围来说,这是一个简单而有效的机制。
  3. 正如我们所说的,我们的服务器还必须订阅 chat_messages 通道,因此我们注册一个侦听器来接收发布到该通道的所有消息(通过当前服务器或任何其他聊天服务器)。当收到消息时,我们只是将它广播给所有连接到当前 WebSocket 服务器的客户端。

这些少许的改变足以让聊天服务器信息互通。为了证明这一点,我们可以尝试启动我们应用程序的多个实例:

node app 8080
node app 8081
node app 8082

然后,我们可以将多个浏览器的选项卡连接到每个实例,并验证我们发送到一台服务器的消息是否被连接到不同服务器的所有其他客户端成功接收。恭喜!我们只使用发布/订阅模式集成了分布式实时应用程序。

使用ØMQ 进行点对点发布/订阅

代理的存在可以大大简化消息传递系统的体系结构;但是,在某些情况下,它不是最佳解决方案,例如,当不能接受延时的情况下,扩展复杂的分布式系统时,或者当代理节点失败或发生异常的情况。

介绍ØMQ

如果我们的项目可选择点对点消息交换模式,那最佳解决方案应该是 ØMQ ,也称为 zmqZeroMQ0MQ );我们在本书前面已经提到过这个库。 ØMQ 是一个网络库,提供构建各种消息模式的基本工具。它是低级的,速度非常快,并且具有简约的 API ,但它提供了消息传递系统的所有基本构建模块,例如原子消息,负载平衡,队列等等。它支持许多类型的传输,例如进程内通道( inproc:// ),进程间通信( ipc:// ),使用 PGM 协议( pgm://epgm:// )的多播,当然,经典的 TCPtcp:// )。 在 ØMQ 的功能中,我们还可以找到实现发布/订阅模式的工具,这正是我们的例子所需要的。因此,我们现在要做的是从聊天应用程序的体系结构中删除代理( Redis ),并让各个节点以对等方式进行通信,利用 ØMQ 的发布/订阅套接字。

ØMQ 套接字可以被视为类固化网络套接字,它提供了很多方法来帮助实现最常见的消息传递模式。例如,我们可以找到实现发布/订阅,请求/回复或单向通信的套接字。

为聊天设计一个对等体系结构的服务器

当我们从架构中移除代理时,聊天应用程序的每个实例都必须直接连接到其他可用实例,以便接收他们发布的消息。 在ØMQ 中,我们有两种专门为此设计的套接字: PUBSUB 。典型的模式是将 PUB 套接字绑定到一个端口,该端口将开始侦听来自其他 SUB 套接字的订阅。

订阅可以有一个过滤器,指定将传递到 SUB 套接字的消息。该过滤器是一个简单的二进制缓冲区(所以它也可以是一个字符串),它将与消息的开头(这也是一个二进制缓冲区)相匹配。当通过 PUB 套接字发送一条消息时,它将被广播到所有连接的 SUB 套接字,但仅在应用了它们的订阅过滤器之后。仅当使用连接的协议时,过滤器才会应用到发布方,例如 TCP

下图显示了应用于我们的分布式聊天服务器体系结构的模式(为简单起见,仅有两个实例):

要运行本节中的示例,您需要在系统上安装本地 ØMQ 二进制文件。 你可以在 http://zeromq.org/intro:get-the-software 找到更多信息。注意:此示例已针对 ØMQ4.0 分支进行了测试。

使用 ØMQPUB / SUB 套接字

让我们通过修改我们的聊天服务器来看看它是如何工作的:

const WebSocketServer = require('ws').Server;
const args = require('minimist')(process.argv.slice(2));
const zmq = require('zmq');

//static file server
const server = require('http').createServer(
  require('ecstatic')({root: `${__dirname}/www`})
);

const pubSocket = zmq.socket('pub');
pubSocket.bind(`tcp://127.0.0.1:${args['pub']}`);

const subSocket = zmq.socket('sub');
const subPorts = [].concat(args['sub']);
subPorts.forEach(p => {
  console.log(`Subscribing to ${p}`);
  subSocket.connect(`tcp://127.0.0.1:${p}`);
});
subSocket.subscribe('chat');

subSocket.on('message', msg => {
  console.log(`From other server: ${msg}`);
  broadcast(msg.toString().split(' ')[1]);
});

const wss = new WebSocketServer({server: server});
wss.on('connection', ws => {
  console.log('Client connected'); 
  ws.on('message', msg => {
    console.log(`Message: ${msg}`);
    broadcast(msg);
    pubSocket.send(`chat ${msg}`);
  });
});

function broadcast(msg) {
  wss.clients.forEach(client => {
    client.send(msg);
  });
}

server.listen(args['http'] || 8080);

前面的代码清楚地表明,我们的应用程序的逻辑变得稍微复杂一些;然而,考虑到我们正在实施分布式和点对点的发布/订阅模式,它仍然很简单。让我们看看所有的部分是如何结合在一起的:

  1. 我们需要 zmq ,它基本上是 ØMQ 库的 Node.js 版本。我们还需要 minimist ,它是一个命令行参数解析器;我们需要这个能够轻松接受命名参数。
  2. 我们立即创建我们的 PUB 套接字并将其绑定到 - pub 命令行参数中提供的端口。
  3. 我们创建 SUB 套接字,并将它连接到应用程序其他实例的 PUB 套接字。目标 PUB 套接字的端口在-- sub 命令行参数中提供(可能有多个)。然后,我们通过提供 chat 作为过滤器来创建实际订阅,这意味着我们只会收到以 chat 开始的消息。
  4. 当我们的 WebSocket 接收到新消息时,我们将它广播给所有连接的客户端,但我们也通过 PUB 套接字发布它。 我们使用 chat 作为前缀,然后是空格,因此该消息将作为过滤器发布到所有使用 chat 的订阅者。
  5. 我们开始监听到达我们 SUB 套接字的消息,我们对消息做一些简单的解析以删除聊天前缀,然后我们将它广播给所有连接到当前 WebSocket 服务器的客户端。

我们现在已经构建了一个简单的分布式系统,使用点对点发布/订阅模式进行集成!

让我们开始吧,让我们通过确保正确连接它们的 PUBSUB 插槽来启动我们的应用程序的三个实例:

node app --http 8080 --pub 5000 --sub 5001 --sub 5002
node app --http 8081 --pub 5001 --sub 5000 --sub 5002
node app --http 8082 --pub 5002 --sub 5000 --sub 5001

第一个命令将启动一个 HTTP 服务器侦听端口 8080 的实例,同时在端口 5000 上绑定 PUB 套接字,并将 SUB 套接字连接到端口 50015002 ,这是其他两个实例的 PUB 套接字应该侦听的端口。其他两个命令以类似的方式工作。

现在,我们可以看到的第一件事情是,如果与 PUB 套接字对应的端口不可用, ØMQ 不会崩溃。例如,在第一个命令执行时,端口 50015002 仍然不可用;但是, ØMQ 不会引发任何错误。这是因为 ØMQ 具有重连机制,它会自动尝试定期与这些端口建立连接。如果任何节点出现故障或重新启动,此功能特别适用。相同的逻辑适用于 PUB 套接字:如果没有订阅者,它将简单地删除所有消息,但它将继续工作。

此时,我们可以尝试使用浏览器导航到我们启动的任何服务器实例,并验证这些消息是否适当地向所有聊天服务器广播。

在前面的例子中,我们假设了一个静态体系结构,其中实例的数量和地址是事先已知的。我们可以引入一个服务注册表,如前一章所述,动态连接我们的实例。同样重要的是要指出 ØMQ 可以用来实现代理模式。

持久订阅者

消息传递系统中的一个重要抽象是消息队列( MQ )。对于消息队列,消息的发送者和接收者不需要同时处于活动状态和连接状态以建立通信,因为排队系统负责存储消息直到目的地能够 接收他们。 这种行为与 set and forget 范式相反,订户只能在消息系统连接期间才能接收消息。

一个能够始终可靠地接收所有消息的订阅者,即使是在没有收听这些消息时发送的消息,也被称为持久订阅者。

MQTT 协议为发送方和接收方之间交换的消息定义了服务质量(QoS)级别。这些级别对描述任何其他消息系统(不仅仅是 MQTT )的可靠性也非常有用。如下描述:

QoS0 ,最多一次:也被称为“设置并忘记”,消息不会被保留,并且传送未被确认。这意味着在接收机崩溃或断开的情况下,信息可能会丢失。 QoS1 ,至少一次:保证至少收到一次该消息,但如果在通知发件人之前接收器崩溃,则可能发生重复。这意味着消息必须在必须再次发送的情况下持续下去。 QoS2 ,正好一次:这是最可靠的 QoS ; 它保证该消息只被接收一次。 这是以用于确认消息传递的更慢和更数据密集型机制为代价的。

请在 MQTT 规范中了解更多信息 http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt /mqtt-v3r1.html#qos-flows

正如我们所说的,对于持久订阅者,我们的系统必须使用消息队列来在用户断开连接时累积消息。队列可以存储在内存中,也可以保存在磁盘上以允许恢复其消息,即使代理重新启动或崩溃。下图显示了由消息队列支持的持久订阅者:

持久订阅者可能是消息队列所支持的最重要的模式,但它肯定不是唯一的模式,我们将在本章后面看到。

Redis 的发布/订阅命令实现了一个设置和遗忘机制( QoS0 )。但是, Redis 仍然可以用于使用其他命令的组合来实现持久订阅者(不直接依赖其发布/订阅实现)。您可以在以下博客文章中找到关于此技术的说明:

ØMQ 定义了一些支持持久订阅者的模式,但实现这种机制主要取决于我们。

介绍 AMQP

消息队列通常用于不能丢失消息的情况,其中包括任务关键型应用程序,如银行或金融系统。这通常意味着典型的企业级消息队列是一个非常复杂的软件,它使用 bulletproof protocols 和持久存储来保证即使在出现故障时也能传送消息。由于这个原因,企业消息传递中间件多年来一直是 OracleIBM 等巨头的特权,它们中的每一个通常都实施自己的专有协议,导致强大的客户锁定。幸运的是,由于诸如 AMQPSTOMPMQTT 等开放协议的增长,邮件系统进入主流已经有几年了。为了理解消息队列系统的工作原理,现在我们将概述 AMQP ;这是了解如何使用基于此协议的典型 API 的基础。

AMQP 是许多消息队列系统支持的开放标准协议。除了定义通用通信协议外,它还提供了描述路由,过滤,排队,可靠性和安全性的模型。在 AMQP 中,有三个基本组成部分:

  • Queue(队列) :负责存储客户端消费的消息的数据结构。我们的应用程序推送消息到队列,供给一个或多个消费者。如果多个使用者连接到同一个队列,则这些消息会在它们之间进行负载平衡。 队列可以是以下之一:
    • Durable(持久队列) :这意味着如果代理重新启动,队列会自动重新创建。一个持久的队列并不意味着它的内容也被保留下来;实际上,只有标记为持久性的消息才会保存到磁盘,并在重新启动的情况下进行恢复。
    • Exclusive(专有队列) :这意味着队列只能绑定到一个特定的用户连接。当连接关闭时,队列被销毁。
    • Auto-delete(自动删除队列) :这会导致队列在最后一个用户断开连接时被删除。
  • Exchange(交换机) :这是发布消息的地方。交换机根据它实现的算法将消息路由到一个或多个队列:
    • Direct exchange(直接交换机) :通过匹配路由键(例如, chat.msg )整个消息来路由消息。
    • Topic exchange(主题交换机) :它使用与路由密钥相匹配的类似 glob 的模式分发消息(例如, chat.# 匹配以 chat 开始的所有路由密钥)。
    • Fanout exchange(扇出交换机) :它向所有连接的队列广播消息,忽略提供的任何路由密钥。
  • Binding(绑定) :这是交换机和队列之间的链接。它还定义了路由键或用于过滤从交换机到达的消息的模式。

这些组件由代理管理,该代理公开用于创建和操作它们的 API 。当连接到代理时,客户端创建一个到连接的通道,负责维护与代理的通信状态。

AMQP 中,可以通过创建任何类型的非排他性或自动删除的队列来获得持久用户模式。

下图将所有这些组件放在一起:

AMQP 模型比我们目前使用的消息系统( RedisØMQ )更复杂;但是,比起只使用原生发布/订阅机制,它提供了一系列功能和可靠性的保证。

您可以在 RabbitMQ 网站上找到 AMQP 模型的详细介绍: https://www.rabbitmq.com/tutorials/amqp-concepts.html

使用 AMQPRabbitMQ 的持久订阅者

现在让我们练习一下我们了解持久订阅者和 AMQP 的内容,并开发一个小例子。不丢失任何消息很重要的典型场景是,我们希望保持微服务体系结构的不同服务同步;我们在前一章已经描述了这种集成模式。如果我们想要使用经纪商将所有服务保留在同一页面上,那么我们不会丢失任何信息是非常重要的,否则我们可能会处于不一致的状态。

为聊天应用程序设计一个历史记录服务

现在让我们使用微服务方法扩展我们的小聊天应用程序。让我们添加一个历史记录服务,将我们的聊天消息保存在数据库中,这样当客户端连接时,我们可以查询服务并检索整个聊天记录。我们将使用 RabbitMQ brokerAMQP 将历史记录服务器与聊天服务器相集成。

下图显示了我们的架构:

如前面的体系结构所述,我们将使用单个扇出交换机;我们不需要任何特定的路由,所以我们的场景不需要任何更复杂的交换。接下来,我们将为聊天服务器的每个实例创建一个队列。这些队列是排他性的;当聊天服务器处于脱机状态时,我们无意收到任何遗漏的消息,都会传送给历史记录服务器记录,最终还可以针对存储的消息实施更复杂的查询。实际上,这意味着我们的聊天服务器不是持久订阅者,并且只要连接关闭,它们的队列就会被销毁。

相反,历史记录服务器不能丢失任何信息; 否则,它不会达到其目的。我们要为它创建的队列必须耐用,以便在历史记录服务断开连接时发布的任何消息将保留在队列中,并在联机时交付。

我们将使用熟悉的 LevelUP 作为历史记录服务的存储引擎,而我们将使用 amqplib ,并通过 AMQP 协议连接到 RabbitMQ

以下示例需要工作的 RabbitMQ 服务器,侦听其默认端口。 有关更多信息,请参阅其官方安装指南: http://www.rabbitmq.com/download.html

使用 AMQP 实现可靠的历史记录服务

现在让我们实施我们的历史记录服务器!我们将创建一个独立的应用程序(典型的微服务),它在模块 historySvc.js 中实现。该模块由两部分组成:向客户端展示聊天记录的 HTTP 服务器,以及负责捕获聊天消息并将其存储在本地数据库中的 AMQP 使用者。

让我们来看看下面代码中的内容:

const level = require('level');
const timestamp = require('monotonic-timestamp');
const JSONStream = require('JSONStream');
const amqp = require('amqplib');
const db = level('./msgHistory');

require('http').createServer((req, res) => {
  res.writeHead(200);
  db.createValueStream()
    .pipe(JSONStream.stringify())
    .pipe(res);
}).listen(8090);

let channel, queue;
amqp
  .connect('amqp://localhost')  // [1]
  .then(conn => conn.createChannel())
  .then(ch => {
    channel = ch;
    return channel.assertExchange('chat', 'fanout');  // [2]
  })
  .then(() => channel.assertQueue('chat_history'))  // [3]
  .then((q) => {
    queue = q.queue;
    return channel.bindQueue(queue, 'chat');  // [4]
  })
  .then(() => {
    return channel.consume(queue, msg => {  // [5]
      const content = msg.content.toString();
      console.log(`Saving message: ${content}`);
      db.put(timestamp(), content, err => {
        if (!err) channel.ack(msg);
      });
    });
  })
  .catch(err => console.log(err))
;

我们可以立即看到 AMQP 需要一些设置,这对创建和连接模型的所有组件都是必需的。 观察 amqplib 默认支持 Promises 也很有趣,所以我们大量利用它们来简化应用程序的异步步骤。让我们详细看看它是如何工作的:

  1. 我们首先与 AMQP 代理建立连接,在我们的例子中是 RabbitMQ 。然后,我们创建一个 channel ,该 channel 类似于保持我们通信状态的会话。
  2. 接下来,我们建立了我们的会话,名为 chat 。正如我们已经提到的那样,这是一种扇出交换机。 assertExchange() 命令将确保代理中存在交换,否则它将创建它。
  3. 我们还创建了我们的队列,名为 chat_history 。默认情况下,队列是持久的;不是排他性的,也不会自动删除,所以我们不需要传递任何额外的选项来支持持久订阅者。
  4. 接下来,我们将队列绑定到我们以前创建的交换机。在这里,我们不需要任何其他特殊选项,例如路由键或模式,因为交换机是扇出类型的交换机,所以它不执行任何过滤。
  5. 最后,我们可以开始监听来自我们刚创建的队列的消息。我们将使用时间戳记作为密钥( https://npmjs.org/package/monotonic-timestamp )在 LevelDB 数据库中收到的每条消息保存,以保持消息按日期排序。看到我们使用 channel.ack(msg) 来确认每条消息,并且只有在消息成功保存到数据库后,也很有趣。如果代理没有收到 ACK (确认),则该消息将保留在队列中以供再次处理。这是 AMQP 将服务可靠性提升到全新水平的另一个重要特征。如果我们不想发送明确的确认,我们可以将选项 {noAck:true} 传递给 channel.consume() API

将聊天应用程序与 AMQP 集成

要使用 AMQP 集成聊天服务器,我们必须使用与我们在历史记录服务器中实现的设置非常相似的设置,因此我们不打算在此重复。 但是,看看队列是如何创建的以及如何将新消息发布到交换中仍然很有趣。新的 app.js 文件的相关部分如下:

const WebSocketServer = require('ws').Server;
const amqp = require('amqplib');
const JSONStream = require('JSONStream');
const request = require('request');
let httpPort = process.argv[2] || 8080;

const server = require('http').createServer(
  require('ecstatic')({root: `${__dirname}/www`})
);

let channel, queue;
amqp
  .connect('amqp://localhost')
  .then(conn => conn.createChannel())
  .then(ch => {
    channel = ch;
    return channel.assertExchange('chat', 'fanout');
  })
  .then(() => {
    return channel.assertQueue(`chat_srv_${httpPort}`, {exclusive: true});
  })
  .then(q => {
    queue = q.queue;
    return channel.bindQueue(queue, 'chat');
  })
  .then(() => {
    return channel.consume(queue, msg => {
      msg = msg.content.toString();
      console.log('From queue: ' + msg);
      broadcast(msg);
    }, {noAck: true});
  })
  .catch(err => console.log(err))
;

const wss = new WebSocketServer({server: server});
wss.on('connection', ws => {
  console.log('Client connected');
  //query the history service
  request('http://localhost:8090')
    .on('error', err => console.log(err))
    .pipe(JSONStream.parse('*'))
    .on('data', msg => ws.send(msg))
  ;

  ws.on('message', msg => {
    console.log(`Message: ${msg}`);
    channel.publish('chat', '', new Buffer(msg));
  }); 
}); 

function broadcast(msg) {
  wss.clients.forEach(client => client.send(msg));
}

server.listen(httpPort);

正如我们所提到的,我们的聊天服务器不需要成为持久的订阅者。 所以当我们创建我们的队列时,我们传递选项 {exclusive:true} ,指示队列被限制到当前连接,因此一旦聊天服务器关闭,它就会被销毁。

发布新消息也很容易; 我们只需要指定目标交换机(聊天)和一个路由键,在我们的情况下这是空的(''),因为我们正在使用扇出交换。

我们现在可以运行我们改进的聊天应用程序架构;为此,我们开始两个聊天服务器和历史服务:

node app 8080
node app 8081
node historySvc

现在看看我们的系统,特别是历史服务如何在停机的情况下运行,这一点很有意思。如果我们停止历史记录服务器并继续使用聊天应用程序的 Web UI 发送消息,我们将会看到,当历史记录服务器重新启动时,它将立即收到所有错过的消息。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文