- Welcome to the Node.js Platform
- Node.js Essential Patterns
- Asynchronous Control Flow Patterns with Callbacks
- Asynchronous Control Flow Patterns with ES2015 and Beyond
- Coding with Streams
- Design Patterns
- Writing Modules
- Advanced Asynchronous Recipes
- Scalability and Architectural Patterns
- Messaging and Integration Patterns
- Welcome to the Node.js Platform
- Node.js 的发展
- Node.js 的特点
- 介绍 Node.js 6 和 ES2015 的新语法
- reactor 模式
- Node.js Essential Patterns
- Asynchronous Control Flow Patterns with Callbacks
- Asynchronous Control Flow Patterns with ES2015 and Beyond
- Coding with Streams
- Design Patterns
- Writing Modules
- Advanced Asynchronous Recipes
- Scalability and Architectural Patterns
- Messaging and Integration Patterns
请求/回复模式
处理消息传递系统通常意味着使用单向异步通信;发布/订阅就是一个很好的例子。
单向通信可以在并行性和效率方面给我们带来巨大的优势,但单靠它们无法解决我们所有的集成和通信问题。有时候,一个很好的请求/回复模式可能只是这项工作的完美工具。因此,在所有那些我们拥有异步单向通道的情况下,知道如何构建一个允许我们以请求/回复方式交换消息的模式是很重要的。这正是我们接下来要学习的内容。
关联 ID
我们将要学习的第一个请求/回复模式称为关联 ID,它表示在单向通道之上构建请求/回复模式的基本内容。
该模式包括标记每个请求的标识符,然后由接收方附加到响应中;通过这种方式,请求的发送者可以关联这两个消息并将响应返回给正确的处理程序。这优雅地解决了存在单向异步通道的问题,消息可以随时在任何方向传播。我们来看看下图中的例子:
前面的场景显示了如何使用关联 ID
使我们能够将每个响应与正确的请求进行匹配,即使这些响应是以不同的顺序发送和接收的。
使用关联实现请求/答复模式
现在让我们开始通过选择最简单类型的单向通道(一个是点对点(它直接连接系统的两个节点)和一个全双工(消息可以双向传输))来进行尝试。
关于管道连接,我们可以找到例如 WebSockets
:它们在服务器和浏览器之间建立点对点连接,并且消息可以以任何方向传播。另一个例子是使用 child_process.fork()
生成子进程时创建的通信通道。我们应该已经知道了,我们在 Chapter9-Advanced Asynchronous Recipes
中看到了这个 API。这个通道也是异步的:它只将父进程连接到子进程,并允许消息以任何方向传播。这可能是这个类别的最基本的渠道,所以这就是我们下一个例子中要用到的。
下一个应用程序的计划是构建一个抽象,以包装在父进程和子进程之间创建的通道隧道。这个抽象应该提供一个请求/回复通信隧道,通过用一个关联 ID
自动标记每个请求,然后将任何传入回复的 ID
与等待响应的请求处理程序列表进行匹配。
从 Chapter9-Advanced Asynchronous Recipes
中,我们应该记住父进程可以使用两个方法访问带有子进程的通道:
child.send(message)
child.on('message',callback)
以类似的方式,子进程可以使用以下方式访问父进程的通道:
process.send(message)
process.on('message',callback)
这意味着父进程中可用的隧道的 API
与子进程中可用的隧道的 API
相同;这将允许我们建立一个通用的方法,以便可以从通道的两端发送请求。
抽象 request
我们通过考虑负责发送新请求的部分开始构建这个抽象请求;让我们创建一个名为 request.js
的新文件:
const uuid = require('node-uuid');
module.exports = channel => {
const idToCallbackMap = {}; // [1]
channel.on('message', message => { // [2]
const handler = idToCallbackMap[message.inReplyTo];
if(handler) {
handler(message.data);
}
});
return function sendRequest(req, callback) { // [3]
const correlationId = uuid.v4();
idToCallbackMap[correlationId] = callback;
channel.send({
type: 'request',
data: req,
id: correlationId
});
};
};
这就是我们的抽象请求的工作原理:
- 看
request()
函数。该模式的神奇之处在于idToCallbackMap
变量,它存储了传出请求与其回复处理程序之间的关联。 - 一旦工厂被调用,我们所做的第一件事就是开始监听收到的消息。如果消息的关联
ID
(包含在inReplyTo
属性中)与idToCallbackMap
变量中包含的任何ID
相匹配,我们知道我们刚收到一个回复,因此我们获得了对相关响应处理程序的引用,并且用 消息中包含的数据。 - 最后,我们返回我们将用来发送新请求的函数。 其工作是使用 node-uuid 生成关联
ID
,然后将请求数据包装起来,并指定关联 IDcorrelationId
和消息类型type
。
这就是 request
模块;让我们转到下一部分。
抽象 reply
我们距实现完整的 request/reply
模式只有一步之遥,所以让我们看看 request.js
模块的对应的模块是如何工作的。我们创建另一个名为 reply.js
的文件,它将包含答复处理程序:
module.exports = channel =>
{
return function registerHandler(handler) {
channel.on('message', message => {
if (message.type !== 'request') return;
handler(message.data, reply => {
channel.send({
type: 'response',
data: reply,
inReplyTo: message.id
});
});
});
};
};
我们的 reply
模块又是一个工厂,它返回一个函数来注册新的答复处理程序。这是在注册新处理程序时发生的情况:
- 我们开始监听传入的请求,当我们收到请求时,我们立即通过传递消息的数据和回调函数来收集处理程序的回复来调用处理程序。
handler
程序完成其工作后,它将调用我们提供的回调,并返回其答复。然后我们通过附加请求的关联ID
(inReplyTo
属性)来构建,然后我们将所有内容都放回到隧道中。
关于这种模式的惊人之处在于,在 Node.js
中,它非常容易;我们所有的东西都是异步的,所以建立在单向通道之上的异步请求/回复通信与其他任何异步操作并没有太大的不同,特别是当我们构建一个抽象方法来隐藏其实现细节时。
尝试运行完整的 request/reply 模块
现在我们准备尝试运行我们新的异步 request/reply 模块。 让我们在一个名为 replier.js
的文件中创建一个示例 replier
:
const reply = require('./reply')(process);
reply((req, cb) => {
setTimeout(() => {
cb({sum: req.a + req.b});
}, req.delay);
});
我们的 replier
只需计算两个接收到的数字之间的和,并在某个延迟(也在请求中指定)之后返回结果。这将允许我们验证响应的顺序也可能与我们发送请求的顺序不同,以确认我们的模块正在工作。
完成示例的最后一步是在名为 requestor.js
的文件中创建请求者,该文件还具有使用 child_process.fork()
启动 replier
的任务:
const replier = require('child_process')
.fork(`${__dirname}/replier.js`);
const request = require('./request')(replier);
request({a: 1, b: 2, delay: 500}, res => {
console.log('1 + 2 = ', res.sum);
// 这应该是我们收到的最后一个回复,所以我们关闭了 channel
replier.disconnect();
});
request({a: 6, b: 1, delay: 100}, res => {
console.log('6 + 1 = ', res.sum);
});
请求者启动 replier
,然后将其引用传递给我们的请求模块。然后,我们运行一些示例请求,并验证它们与收到的响应之间的关联是否正确。
要试用这个示例,只需启动 requestor.js
模块; 输出应该类似于以下内容:
6 + 1 = 7
1 + 2 = 3
这证实了我们的模式完美地工作,并且 reply
与他们自己的请求正确地相关联,不管他们以什么顺序发送或接收。
返回地址
关联 ID
是在单向信道之上创建请求/回复通信的基本模式;然而,当我们的消息架构拥有多个通道或队列,或者可能有多个请求者时,这还不够。在这些情况下,除了关联 ID
之外,我们还需要知道返回地址,这是允许回复者将回复发送回请求的原始发件人的一条信息。
在 AMQP 中实现返回地址模式
在 AMQP
中,返回地址是请求者正在侦听传入回复的队列。因为响应只能由一个请求者接收,所以队列是私有的并且不在不同的使用者之间共享是很重要的。从这些属性中,我们可以推断出我们将需要一个暂时队列,将其作用于请求者的连接,并且应答者必须与返回队列建立点对点通信,以便能够传递其响应。
以下为我们提供了这种情况的一个例子:
为了在 AMQP
上创建请求/应答模式,我们需要做的就是在消息属性中指定响应队列的名称;这样,回复者知道应答消息必须传送到哪里。 这个理论看起来非常简单,所以我们来看看如何在真正的应用程序中实现它。
实现 request
现在让我们在 AMQP
之上构建一个请求/回复抽象。我们将使用 RabbitMQ
作为代理,但任何兼容的 AMQP
代理都应该可以完成这项工作。让我们从请求开始(在 amqpRequest.js
模块中实现);我们只会在这里展示相关的部分。
第一件事情是我们如何创建队列来保存响应;看以下代码:
channel.assertQueue('', {exclusive: true});
当我们创建队列时,我们没有指定任何名字,这意味着我们会选择一个随机的名字;除此之外,队列是独占的,这意味着它被绑定到活动的 AMQP
连接,并且在连接关闭时它将被销毁。没有必要将队列绑定到交换机,因为我们不需要任何路由或分配到多个队列;这意味着消息必须直接传递到我们的响应队列中。
接下来,让我们看看我们如何产生一个新的请求:
class AMQPRequest {
//...
request(queue, message, callback) {
const id = uuid.v4();
this.idToCallbackMap[id] = callback;
this.channel.sendToQueue(queue, new Buffer(JSON.stringify(message)), {
correlationId: id,
replyTo: this.replyQueue
});
}
}
request()
方法接受请求队列的名称和要发送的消息作为输入。正如我们在前一节中所了解的,我们需要生成一个关联 ID
并将其关联到回调函数。最后,我们发送消息,指定 correlationId
和 replyTo
属性作为元数据。
有趣的是,为了发送消息,我们使用 channel.sentToQueue() API
而不是 channel.publish()
;这是因为我们不希望使用交换机来实施任何发布/订阅分发,而是直接进入目标队列的更基本的点对点传递。
在
AMQP
中,我们可以指定一组要传递给消费者的属性(或元数据)以及主要消息。
我们的 amqpRequest
类的最后一个重要部分是我们监听传入响应的地方:
_listenForResponses() {
return this.channel.consume(this.replyQueue, msg => {
const correlationId = msg.properties.correlationId;
const handler = this.idToCallbackMap[correlationId];
if (handler) {
handler(JSON.parse(msg.content.toString()));
}
}, {
noAck: true
});
}
在前面的代码中,我们监听我们明确创建的用于接收响应的队列中的消息,然后为每个传入消息读取关联 ID
,并将它与等待答复的处理程序列表进行匹配。一旦我们有了处理程序,我们只需要通过传递 reply
消息来调用它。
实现 reply
这就是 amqpRequest
模块。现在是时候在名为 amqpReply.js
的新模块中实现响应对象。
在这里,我们必须保存传入请求的队列;我们可以为此使用一个简单的持久队列。我们不会展示这部分,因为它在所有 AMQP
都具有。我们感兴趣的是看到的是我们如何处理请求,然后将其发送回正确的队列:
class AMQPReply {
//...
handleRequest(handler) {
return this.channel.consume(this.queue, msg => {
const content = JSON.parse(msg.content.toString());
handler(content, reply => {
this.channel.sendToQueue(
msg.properties.replyTo, // 这里保存的请求消息的队列
new Buffer(JSON.stringify(reply)), {
correlationId: msg.properties.correlationId
}
);
this.channel.ack(msg);
});
});
}
}
在发送 reply
时,我们使用 channel.sendToQueue()
将消息直接发布到消息的 replyTo
属性(我们的返回地址)中指定的队列中。我们的 amqpReply
对象的另一个重要任务是在回复对象中设置 correlationId
,以便接收者可以将消息与挂起的请求列表进行匹配。
实现 requestor 和 replier
现在一切都准备好了,让我们首先尝试一下,但首先,让我们构建一个样本 requestor
和 replier
,从模块 replier.js
开始:
const Reply = require('./amqpReply');
const reply = Reply('requests_queue');
reply.initialize().then(() => {
reply.handleRequest((req, cb) => {
console.log('Request received', req);
cb({sum: req.a + req.b});
});
});
可以看到我们构建的模块如何处理关联 ID
和返回地址。我们所需要做的就是初始化一个新的 reply
对象,指定我们希望接收我们请求的队列的名称( requests_queue
)。我们的样本重新计算接收到的两个数字的总和作为输入,并使用提供的回调函数返回结果。
另一方面,我们在 requestor.js
文件中实现了一个样例 request
:
const req = require('./amqpRequest')();
req.initialize().then(() => {
for (let i = 100; i > 0; i--) {
sendRandomRequest();
}
});
function sendRandomRequest() {
const a = Math.round(Math.random() * 100);
const b = Math.round(Math.random() * 100);
req.request('requests_queue', {a: a, b: b},
res => {
console.log(`${a} + ${b} = ${res.sum}`);
}
);
}
我们的示例请求程序将 100
个随机请求发送到 requests_queue
队列。在这种情况下,有趣的是我们完美地完成了它的工作,隐藏了异步请求/应答模式的所有细节。
现在,要尝试系统,只需运行 replier
程序模块和 requestor
模块:
node replier
node requestor
我们会看到 requestor
发布的一系列操作,然后由 replier
收到,然后回复 response
。
现在我们可以尝试其他实验。一旦 replier
第一次启动,它会创建一个持久队列;这意味着,如果我们现在停止并再次运行请求者,则不会有任何请求丢失。 所有消息都将存储在队列中,直到重新启动重新启动。
这些都是因为我们使用了 AMQP
。 为了测试这个假设,我们可以尝试启动两个或更多的 replier
实例,并观察它们之间的负载平衡请求。这是有效的,因为每次 requestor
启动时,它将自己作为一个监听器附加到同一个持久队列中,结果,代理将负载均衡队列中所有消费者的消息同步到这里。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论