发布/订阅模式
发布/订阅(通常缩写为 Pub/Sub)可能是最著名的单向消息传递模式。 我们应该已经熟悉它了,因为它只不过是一个分布式观察者模式。 与观察者的情况一样,我们有一组订阅者注册他们对接收特定类别的消息的兴趣。 另一方面,发布者生成分布在所有相关订阅者之间的消息。 图 13.7 显示了 Pub/Sub 模式的两个主要变体; 第一个基于点对点架构,第二个使用代理来协调通信:
/image-2024-05-08-13-41-00-094.png)
Pub/Sub 之所以如此特别,是因为发布者事先并不知道消息的接收者是谁。 正如我们所说,订阅者必须注册其兴趣才能接收特定消息,从而允许发布者与未指定数量的接收者合作。 换句话说,Pub/Sub 模式的两侧是松散耦合的,这使得它成为集成不断发展的分布式系统的节点的理想模式。
代理的存在进一步改善了系统节点之间的解耦,因为订阅者仅与代理交互,而不知道哪个节点是消息的发布者。 正如我们稍后将看到的,代理还可以提供消息排队系统,即使在节点之间存在连接问题的情况下也可以进行可靠的传递。
现在,让我们通过一个示例来演示此模式。
构建一个极简的实时聊天应用程序
为了展示 Pub/Sub 模式如何帮助我们集成分布式架构的现实示例,我们现在将使用纯 WebSocket 构建一个非常基本的实时聊天应用程序。 然后,我们将通过运行多个实例来扩展它,最后,使用消息传递系统,我们将在所有服务器实例之间构建通信通道。
实现服务器端
现在,让我们一步一步来。 我们首先构建一个基本的聊天应用程序,然后将其扩展到多个实例。
为了实现典型聊天应用程序的实时功能,我们将依赖 ws 包 (nodejsdp.link/ws),它是 Node.js 的纯 WebSocket 实现。 在 Node.js 中实现实时应用程序非常简单,我们将要编写的代码将证实这一假设。 因此,让我们在名为 index.js 的文件中创建聊天应用程序的服务器端:
import {createServer} from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
// serve static files
const server = createServer((req, res) => { // (1)
return staticHandler(req, res, {public: 'www'})
})
const wss = new ws.Server({server}) // (2)
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => { // (3)
console.log(`Message: ${msg}`)
broadcast(msg)
})
})
function broadcast(msg) { // (4)
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(process.argv[2] || 8080)
就是这样! 这就是我们实现聊天应用程序的服务器端组件所需的全部内容。 它的工作原理如下:
-
我们首先创建一个 HTTP 服务器并将每个请求转发到一个特殊的处理程序 (nodejsdp.link/serve-handler),该处理程序将负责提供 www 目录中的所有静态文件。 这是访问应用程序的客户端资源(例如 HTML、JavaScript 和 CSS 文件)所必需的。
-
然后,我们创建一个新的 WebSocket 服务器实例,并将其附加到现有的 HTTP 服务器。 接下来,我们通过附加连接事件的事件侦听器来开始侦听传入的 WebSocket 客户端连接。
-
每次有新客户端连接到我们的服务器时,我们就开始侦听传入消息。 当新消息到达时,我们将其广播给所有连接的客户端。
-
broadcast() 函数是对所有已知客户端的简单迭代,其中在每个连接的客户端上调用 send() 函数。
这就是 Node.js 的魔力! 当然,我们刚刚实现的服务器非常小且基本,但正如我们将看到的,它完成了它的工作。
实施客户端
接下来,是时候实现我们的聊天应用程序的客户端了。 这可以通过另一个紧凑而简单的代码片段来完成,本质上是一个带有一些基本 JavaScript 代码的最小 HTML 页面。 让我们在名为 www/index.html 的文件中创建此页面,如下所示:
<!DOCTYPE html>
<html>
<body>
Messages:
<div id="messages"></div>
<form id="msgForm">
<input type="text" placeholder="Send a message" id="msgBox"/>
<input type="submit" value="Send"/>
</form>
<script>
const ws = new WebSocket(
`ws://${window.document.location.host}`
)
ws.onmessage = function (message) {
const msgDiv = document.createElement('div')
msgDiv.innerHTML = message.data
document.getElementById('messages').appendChild(msgDiv)
}
const form = document.getElementById('msgForm')
form.addEventListener('submit', (event) => {
event.preventDefault()
const message = document.getElementById('msgBox').value
ws.send(message)
document.getElementById('msgBox').value = ''
})
</script>
</body>
</html>
我们刚刚创建的 HTML 页面实际上不需要太多注释,它只是一个简单的 Web 开发。 我们使用原生 WebSocket 对象初始化与 Node.js 服务器的连接,然后开始侦听来自服务器的消息,并在消息到达时将其显示在新的 div 元素中。 相反,为了发送消息,我们在表单中使用简单的文本框和按钮。
请注意,当停止或重新启动聊天服务器时,WebSocket 连接将关闭,客户端不会尝试自动重新连接(正如我们对生产级应用程序所期望的那样)。 这意味着服务器重新启动后需要刷新浏览器来重新建立连接(或实现重新连接机制,为简洁起见,我们在此不再介绍)。 此外,在我们应用程序的初始版本中,客户端在未连接到服务器时将不会收到任何发送的消息。 |
运行和扩展聊天应用程序
我们可以尝试立即运行我们的应用程序。 只需使用以下命令启动服务器:
node index.js 8080
然后,打开几个浏览器选项卡甚至两个不同的浏览器,将它们指向 http://localhost:8080 ,然后开始聊天:
/image-2024-05-08-13-46-00-276.png)
现在,我们想看看当我们尝试通过启动多个实例来扩展应用程序时会发生什么。 让我们尝试这样做。 让我们在另一个端口上启动另一个服务器:
node index.js 8081
期望的结果应该是连接到两个不同服务器的两个不同客户端应该能够交换聊天消息。 不幸的是,我们当前的实现并没有发生这种情况。 我们可以通过打开另一个浏览器选项卡到 http://localhost:8081 来测试这一点。
在实际应用程序中,我们将使用负载均衡器在实例之间分配负载,但在本演示中我们不会使用负载均衡器。 这使我们能够以确定的方式访问每个服务器实例,以验证它如何与其他实例交互。 |
在一个实例上发送聊天消息时,我们仅在本地广播该消息,仅将其分发到连接到该特定服务器的客户端。 实际上,两台服务器不会相互通信。 我们需要整合它们,这正是我们接下来将看到的。
使用 Redis 作为简单的消息代理
我们通过介绍 Redis (nodejsdp.link/redis) 来开始分析最常见的 Pub/Sub 实现,Redis 是一种非常快速且灵活的内存数据结构存储。 Redis 通常用作数据库或缓存服务器,但是,在其众多功能中,有一对专门设计用于实现集中式 Pub/Sub 消息交换模式的命令。
Redis 的消息代理功能(有意)非常简单和基本,特别是如果我们将它们与更先进的面向消息的中间件进行比较。 然而,这也是其受欢迎的主要原因之一。 通常,Redis 已经在现有基础设施中可用,例如用作缓存服务器或会话数据存储。 它的速度和灵活性使其成为分布式系统中共享数据的非常流行的选择。 因此,一旦项目中出现发布/订阅代理的需求,最简单、最直接的选择就是复用 Redis 本身,从而避免安装和维护专用的消息代理。
现在让我们通过一个示例来演示使用 Redis 作为消息代理的简单性和强大功能。
此示例需要正常安装 Redis,并侦听其默认端口。 您可以在 nodejsdp.link/redis-quickstart 找到更多详细信息。 |
我们的行动计划是使用 Redis 作为消息代理来集成我们的聊天服务器。 每个实例将从其客户端接收到的任何消息发布到代理,同时,它订阅来自其他服务器实例的任何消息。 正如我们所看到的,我们架构中的每个服务器既是订阅者又是发布者。 下图显示了我们想要获得的架构的表示:
/image-2024-05-08-13-48-49-492.png)
根据图 13.9 中描述的架构,我们可以将消息的旅程总结如下:
-
消息被输入到网页的文本框中,然后发送到聊天服务器的连接实例。
-
然后消息被发布到代理。
-
代理将消息分派给所有订阅者,在我们的架构中,这些订阅者都是聊天服务器的实例。
-
在每个实例中,消息都会分发到所有连接的客户端。
让我们在实践中看看它是如何工作的。 让我们通过添加发布/订阅逻辑来修改服务器代码:
import {createServer} from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import Redis from 'ioredis' // (1)
const redisSub = new Redis()
const redisPub = new Redis()
// serve static files
const server = createServer((req, res) => {
return staticHandler(req, res, {public: 'www'})
})
const wss = new ws.Server({server})
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisPub.publish('chat_messages', msg) // (2)
})
})
redisSub.subscribe('chat_messages') // (3)
redisSub.on('message', (channel, msg) => {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
})
server.listen(process.argv[2] || 8080)
我们对原始聊天服务器所做的更改在前面的代码中突出显示。 新实现的工作原理如下:
-
为了将 Node.js 应用程序连接到 Redis 服务器,我们使用 ioredis 包 (nodejsdp.link/ioredis),它是一个完整的 Node.js 客户端,支持所有可用的 Redis 命令。 接下来,我们实例化两个不同的连接,一个用于订阅频道,另一个用于发布消息。 这在 Redis 中是必要的,因为一旦连接置于订阅者模式,就只能使用与订阅相关的命令。 这意味着我们需要第二个连接来发布消息。
-
当从连接的客户端收到新消息时,我们会在 chat_messages 通道中发布该消息。 我们不会直接将消息广播给客户端,因为我们的服务器订阅了同一频道(稍后我们将看到),因此它将通过 Redis 返回给我们。 就本示例的范围而言,这是一种简单而有效的机制。 但是,根据应用程序的要求,您可能希望立即广播消息并忽略从 Redis 到达并源自当前服务器实例的任何消息。 我们将其留给您作为练习。
-
正如我们所说,我们的服务器还必须订阅 chat_messages 通道,因此我们注册一个侦听器来接收发布到该通道的所有消息(通过当前服务器实例或任何其他聊天服务器实例)。 当收到消息时,我们只需将其广播到连接到当前 WebSocket 服务器的所有客户端。
这几个更改足以集成我们可能决定启动的所有聊天服务器实例。 为了证明这一点,您可以尝试启动我们应用程序的多个实例:
node index.js 8080
node index.js 8081
node index.js 8082
然后,您可以将多个浏览器选项卡连接到每个实例,并验证您发送到一个实例的消息是否已被连接到其他实例的所有其他客户端成功接收。
恭喜! 我们只是使用发布/订阅模式集成了分布式实时应用程序的多个节点。
Redis 允许我们发布和订阅由字符串标识的频道,例如 chat.nodejs。 但它还允许我们使用 globstyle 模式来定义可能匹配多个频道的订阅,例如 chat.*。 |
使用 ZeroMQ 进行点对点发布/订阅
代理的存在可以大大简化消息传递系统的架构。 然而,在某些情况下,这可能不是最好的解决方案。 这包括低延迟至关重要的所有情况,或者扩展复杂的分布式系统时,或者不能选择出现单点故障时。 当然,使用代理的替代方法是实现点对点消息传递系统。
ZeroMQ 简介
如果我们的项目是点对点架构的良好候选者,那么评估的最佳解决方案之一肯定是 ZeroMQ(nodejsdp.link/zeromq,也称为 zmq 或 ØMQ)。 ZeroMQ 是一个网络库,它提供了构建各种消息传递模式的基本工具。 它是低级的,速度极快,并且具有简约的 API,但它提供了创建可靠的消息传递系统的所有基本构建块,例如原子消息、负载平衡、队列等等。 它支持多种类型的传输,例如进程内通道 (inproc://)、进程间通信 (ipc://)、使用 PGM 协议的多播(pgm:// 或 epgm://),以及 当然是经典的TCP(tcp://)。
在 ZeroMQ 的功能中,我们还可以找到实现发布/订阅模式的工具,这正是我们的示例所需要的。 因此,我们现在要做的就是从聊天应用程序的架构中删除代理(Redis),并利用 ZeroMQ 的发布/订阅套接字,让各个节点以点对点的方式进行通信。
ZeroMQ 套接字可以被视为增强型网络套接字,它提供了额外的抽象来帮助实现最常见的消息传递模式。 例如,我们可以找到旨在实现发布/订阅、请求/回复或单向推送通信的套接字。 |
为聊天服务器设计点对点架构
当我们从架构中删除代理时,聊天服务器的每个实例都必须直接连接到其他可用实例才能接收它们发布的消息。 在 ZeroMQ 中,我们有两种专门为此目的而设计的套接字:PUB 和 SUB。 典型的模式是将 PUB 套接字绑定到本地端口,它将开始侦听来自 SUB 类型套接字的传入订阅请求。
订阅可以有一个过滤器,指定将哪些消息传递到连接的 SUB 套接字。 过滤器是一个简单的二进制缓冲区(因此它也可以是一个字符串),它将与消息的开头(也是一个二进制缓冲区)进行匹配。 当消息通过 PUB 套接字发送时,它会广播到所有连接的 SUB 套接字,但仅在应用了它们的订阅过滤器之后。 仅当使用连接协议(例如 TCP)时,过滤器才会应用于发布方。
下图显示了应用于我们的分布式聊天服务器架构的模式(为了简单起见,只有两个实例):
/image-2024-05-08-13-53-29-502.png)
图 13.10 显示了当我们有两个聊天应用程序实例时的信息流,但相同的概念可以应用于 N 个实例。 这种架构告诉我们,每个节点必须了解系统中的其他节点才能建立所有必要的连接。 它还向我们展示了订阅如何从 SUB 套接字转移到 PUB 套接字,而消息则以相反的方向传播。
使用 ZeroMQ PUB/SUB 套接字
让我们通过修改我们的聊天服务器来看看 ZeroMQ PUB/SUB 套接字在实践中是如何工作的:
import {createServer} from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import yargs from 'yargs' // (1)
import zmq from 'zeromq'
// serve static files
const server = createServer((req, res) => {
return staticHandler(req, res, {public: 'www'})
})
let pubSocket
async function initializeSockets() {
pubSocket = new zmq.Publisher() // (2)
await pubSocket.bind(`tcp://127.0.0.1:${yargs.argv.pub}`)
const subSocket = new zmq.Subscriber() // (3)
const subPorts = [].concat(yargs.argv.sub)
for (const port of subPorts) {
console.log(`Subscribing to ${port}`)
subSocket.connect(`tcp://127.0.0.1:${port}`)
}
subSocket.subscribe('chat')
for await (const [msg] of subSocket) { // (4)
console.log(`Message from another server: ${msg}`)
broadcast(msg.toString().split(' ')[1])
}
}
initializeSockets()
const wss = new ws.Server({server})
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(msg)
pubSocket.send(`chat ${msg}`) // (5)
})
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(yargs.argv.http || 8080)
前面的代码清楚地表明我们的应用程序的逻辑变得稍微复杂一些,但是,考虑到我们正在实现点对点发布/订阅模式,它仍然很简单。 让我们看看所有部分是如何组合在一起的:
-
我们导入两个新包。 首先,我们导入 yargs (nodejsdp.link/yargs),这是一个命令行参数解析器; 我们需要它来轻松接受命名参数。 其次,我们导入 Zeromq 包 (nodejsdp.link/zeromq),它是 ZeroMQ 的 Node.js 客户端。
-
在 initializeSockets() 函数中,我们立即创建 Publisher 套接字并将其绑定到 --pub 命令行参数中提供的端口。
-
我们创建订阅者套接字,并将其连接到应用程序其他实例的发布者套接字。 目标发布者套接字的端口在 --sub 命令行参数中提供(可能有多个)。 然后,我们通过提供聊天作为过滤器来创建实际的订阅,这意味着我们将仅收到以聊天开头的消息。
-
我们开始使用 for wait…of 循环侦听到达订阅者套接字的消息,因为 subSocket 是异步可迭代的。 对于收到的每条消息,我们都会进行一些简单的解析以删除聊天前缀,然后将实际的有效负载广播()到连接到当前 WebSocket 服务器的所有客户端。
-
当当前实例的 WebSocket 服务器收到新消息时,我们将其广播到所有连接的客户端,但我们也通过 Publisher 套接字发布它。 我们使用 chat 作为前缀,后跟一个空格,以便将消息发布到使用 chat 作为过滤器的所有订阅。
我们现在已经构建了一个简单的分布式系统,使用点对点发布/订阅模式进行集成!
让我们启动它,通过确保正确连接它们的发布者和订阅者套接字来启动应用程序的三个实例:
node index.js --http 8080 --pub 5000 --sub 5001 --sub 5002
node index.js --http 8081 --pub 5001 --sub 5000 --sub 5002
node index.js --http 8082 --pub 5002 --sub 5000 --sub 5001
第一个命令将启动一个实例,其中 HTTP 服务器侦听端口 8080,同时将其发布者套接字绑定在端口 5000 上,并将订阅者套接字连接到端口 5001 和 5002,这是其他两个实例的发布者套接字应该侦听的位置 。 其他两个命令的工作方式类似。
现在,您将看到的第一件事是,如果订阅者套接字无法与发布者套接字建立连接,ZeroMQ 不会抱怨。 例如,在执行第一个命令时,没有 Publisher 套接字侦听端口 5001 和 5002,但是 ZeroMQ 不会抛出任何错误。 这是因为 ZeroMQ 被构建为具有故障恢复能力,并且它实现了内置的连接重试机制。 如果任何节点出现故障或重新启动,此功能也会特别方便。 相同的宽容逻辑适用于发布者套接字:如果没有订阅,它将简单地删除所有消息,但它将继续工作。
此时,我们可以尝试使用浏览器导航到我们启动的任何服务器实例,并验证消息是否正确传播到所有聊天服务器。
在前面的示例中,我们假设了一个静态架构,其中实例的数量及其地址是预先已知的。 我们可以引入一个服务注册表,如第 12 章 “可扩展性和架构模式” 中所述,来动态连接我们的实例。 同样重要的是要指出,ZeroMQ 可用于使用我们在此演示的相同原语来实现代理。 |
使用队列进行可靠的消息传递
消息传递系统中的一个重要抽象是消息队列(MQ)。 使用消息队列,消息的发送者和接收者不一定需要同时处于活动状态和连接状态才能建立通信,因为排队系统负责存储消息,直到目的地能够 接收它们。 此行为与“即发即弃”范例相反,在“即发即忘”范例中,订阅者只能在连接到消息传递系统期间接收消息。
能够始终可靠地接收所有消息(甚至是在未监听消息时发送的消息)的订阅者称为持久订阅者。
我们可以将消息传递系统的传递语义总结为三类:
-
最多一次:也称为 “即发即弃”,消息不会保留,并且不会确认传送。 这意味着在接收器崩溃或断开连接的情况下消息可能会丢失。
-
至少一次:保证消息至少被接收一次,但如果接收方在通知发送方接收之前崩溃,则可能会发生重复。 这意味着消息必须被持久化,以防万一必须再次发送。
-
恰好一次:这是最可靠的传递语义。 它保证消息被接收一次且仅一次。 这是以更慢且更数据密集的机制来确认消息传递为代价的。
当我们的消息系统可以实现 “至少一次” 或 “恰好一次” 传递语义时,我们就拥有了持久订阅者,为此,系统必须使用消息队列在订阅者断开连接时累积消息。 队列可以存储在内存中或持久保存在磁盘上,以便即使排队系统重新启动或崩溃也可以恢复其消息。
下图显示了由消息队列支持的持久订阅者的图形表示:
/image-2024-05-08-14-00-34-192.png)
图 13.11 向我们展示了消息队列如何帮助我们实现持久订阅者模式。 正如我们所看到的,在正常操作期间 (1) 消息通过消息队列从发布者传输到订阅者。 当订阅者由于崩溃、故障或只是计划的维护期而离线 (2) 时,发布者发送的任何消息都会安全地存储和累积在消息队列中。 之后,当订阅者重新上线 (3) 时,队列中累积的所有消息都会发送给订阅者,因此不会丢失任何消息。
持久订阅者可能是消息队列启用的最重要的模式,但它肯定不是唯一的模式,我们将在本章后面看到。
接下来,我们将学习 AMQP,我们将在本章的其余部分中使用该协议来实现消息队列示例。
AMQP 简介
消息队列通常用于消息不能丢失的情况,包括银行系统、空中交通管理和控制系统、医疗应用等关键任务应用。 这通常意味着典型的企业级消息队列是一个非常复杂的软件,它利用防弹协议和持久存储来保证消息的传递,即使在出现故障的情况下也是如此。 因此,企业消息中间件多年来一直是 Oracle 和 IBM 等科技巨头的特权,他们通常都实现自己的专有协议,从而形成了强大的客户锁定。 幸运的是,由于 AMQP、STOMP 和 MQTT 等开放协议的发展,消息系统进入主流已经有几年了。 在本章的其余部分中,我们将使用 AMQP 作为队列系统的消息传递协议,因此让我们对其进行适当的介绍。
AMQP 是许多消息队列系统支持的开放标准协议。 除了定义通用通信协议之外,它还提供了一个模型来描述路由、过滤、排队、可靠性和安全性。
下图一目了然地展示了所有 AMQP 组件:
/image-2024-05-08-14-01-46-339.png)
如图 13.12 所示,AMQP 中有三个基本组件:
-
队列:负责存储客户端使用的消息的数据结构。 来自队列的消息被推送(或拉取)给一个或多个消费者。 如果多个消费者附加到同一个队列,消息将在它们之间进行负载平衡。 队列可以是以下任一队列:
-
持久:这意味着如果代理重新启动,将自动重新创建队列。 持久队列并不意味着它的内容也会被保留; 事实上,只有标记为持久的消息才会保存到磁盘并在重新启动时恢复。
-
独占:这意味着队列仅绑定到一个特定订阅者连接。 当连接关闭时,队列将被销毁。
-
自动删除:当最后一个订阅者断开连接时,这将导致队列被删除。
-
-
Exchange:这是发布消息的地方。 交换器根据其实现的算法将消息路由到一个或多个队列:
-
直接交换:它通过匹配整个路由键(例如,chat.msg)来路由消息
-
主题交换:它使用类 glob 模式分发消息与路由键匹配(例如,chat.# 匹配以 chat 开头的所有路由键。)
-
扇出交换:它向所有连接的队列广播消息,忽略提供的任何路由键
-
-
绑定:这是 交换器和队列之间的链接。 它还定义用于过滤从交换器到达的消息的路由键或模式。
这些组件由代理管理,代理公开用于创建和操作它们的 API。 连接到代理时,客户端会创建一个通道(连接的抽象),负责维护与代理的通信状态。
在 AMQP 中,我们可以通过创建任何类型的不独占或自动删除的队列来获得持久订阅者模式。 |
AMQP 模型比我们迄今为止使用的消息系统(Redis 和 ZeroMQ)复杂得多。 然而,它提供了一组功能和一定程度的可靠性,而仅使用原始的发布/订阅机制很难获得这些功能和可靠性。
您可以在 RabbitMQ 网站上找到 AMQP 模型的详细介绍:nodejsdp.link/amqp-components。 |
使用 AMQP 和 RabbitMQ 的持久订阅者
现在让我们练习一下我们所学到的关于持久订阅者和 AMQP 的知识,并研究一个小示例。 不丢失任何消息的一个典型场景是当我们想要保持微服务架构的不同服务同步时(我们已经在前一章中描述了这种集成模式)。 如果我们想使用代理将所有服务保持在同一页面上,那么重要的是我们不要丢失任何信息,否则我们可能会陷入不一致的状态。
为聊天应用程序设计历史记录服务
现在让我们使用微服务方法扩展我们的小型聊天应用程序。 让我们添加一个历史服务,将聊天消息保存在数据库中,以便当客户端连接时,我们可以查询该服务并检索整个聊天历史记录。 我们将使用 RabbitMQ 代理 (nodejsdp.link/rabbitmq) 和 AMQP 将历史记录服务与聊天服务器集成。
下图展示了我们规划的架构:
/image-2024-05-08-14-06-45-370.png)
如图 13.13 所示,我们将使用单个扇出交换; 我们不需要任何复杂的路由逻辑,因此我们的场景不需要任何比这更复杂的交换。 接下来,我们将为聊天服务器的每个实例创建一个队列。
这些队列是排他性的,因为我们对接收聊天服务器离线时丢失的任何消息不感兴趣; 这是我们的历史服务的工作,它最终还可以对存储的消息实现更复杂的查询。 实际上,这意味着我们的聊天服务器不是持久订阅者,一旦连接关闭,它们的队列就会被销毁。 相反,历史服务不能丢失任何消息,否则它将无法实现其目的。 因此,我们要为其创建的队列必须是持久的,以便历史服务断开连接时发布的任何消息都将保留在队列中,并在其重新上线时传递。
我们将使用熟悉的 LevelUP 作为历史服务的存储引擎,同时我们将使用 amqplib 包(nodejsdp.link/amqplib)通过 AMQP 协议连接到 RabbitMQ。
下面的示例需要一个正在运行的 RabbitMQ 服务器,侦听其默认端口。 有关更多信息,请参阅其官方安装指南:nodejsdp.link/rabbitmqgetstarted。 |
使用 AMQP 实现历史服务
现在让我们实现我们的历史服务! 我们将创建一个独立的应用程序(典型的微服务),它在 historySvc.js 模块中实现。 该模块由两部分组成:一个向客户端公开聊天历史记录的 HTTP 服务器,以及一个负责捕获聊天消息并将其存储在本地数据库中的 AMQP 消费者。
让我们看看下面的代码是什么样子的:
import {createServer} from 'http'
import level from 'level'
import timestamp from 'monotonic-timestamp'
import JSONStream from 'JSONStream'
import amqp from 'amqplib'
async function main() {
const db = level('./msgHistory')
const connection = await amqp.connect('amqp://localhost') // (1)
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout') // (2)
const {queue} = channel.assertQueue('chat_history') // (3)
await channel.bindQueue(queue, 'chat') // (4)
channel.consume(queue, async msg => { // (5)
const content = msg.content.toString()
console.log(`Saving message: ${content}`)
await db.put(timestamp(), content)
channel.ack(msg)
})
createServer((req, res) => {
res.writeHead(200)
db.createValueStream()
.pipe(JSONStream.stringify())
.pipe(res)
}).listen(8090)
}
main().catch(err => console.error(err))
我们立即可以看到 AMQP 需要进行一些设置,这是创建和连接模型的所有组件所必需的。 让我们详细看看它是如何工作的:
-
我们首先与 AMQP 代理建立连接,在我们的例子中是 RabbitMQ。 然后,我们创建一个类似于会话的通道,它将维护我们的通信状态。
-
接下来,我们建立一个交易所,命名为 chat。 正如我们已经提到的,这是一个扇出交换。 assertExchange() 命令将确保代理上存在交换,否则它将创建它。
-
我们还创建一个名为 chat_history 的队列。 默认情况下,队列是持久的(不独占且不自动删除),因此我们不需要传递任何额外的选项来支持持久订阅者。
-
接下来,我们将队列绑定到之前创建的交换器。 在这里,我们不需要任何其他特定选项(例如路由键或模式),因为交换是扇出类型,因此它不执行任何过滤。
-
最后,我们可以开始监听来自我们刚刚创建的队列的消息。 我们使用单调时间戳作为键(请参阅 nodejsdp.link/monotonictimestamp)将收到的每条消息保存在 LevelDB 数据库中,以保持消息按日期排序。 同样有趣的是,我们使用 channel.ack(msg) 来确认每条消息,但前提是消息成功保存到数据库中。 如果broker没有收到ACK(确认),则该消息将保留在队列中以再次处理。
如果我们对发送显式确认不感兴趣,我们可以将 { noAck: true } 选项传递给channel.consume() API。 |
将聊天应用程序与 AMQP 集成
要使用 AMQP 集成聊天服务器,我们必须使用与我们在历史服务中实现的设置非常相似的设置,但有一些细微的变化。 那么,让我们看看引入 AMQP 后新的 index.js 模块是什么样子的:
import {createServer} from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import amqp from 'amqplib'
import JSONStream from 'JSONStream'
import superagent from 'superagent'
const httpPort = process.argv[2] || 8080
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertExchange('chat', 'fanout')
const {queue} = await channel.assertQueue( // (1)
`chat_srv_${httpPort}`,
{exclusive: true}
)
await channel.bindQueue(queue, 'chat')
channel.consume(queue, msg => { // (2)
msg = msg.content.toString()
console.log(`From queue: ${msg}`)
broadcast(msg)
}, {noAck: true})
// serve static files
const server = createServer((req, res) => {
return staticHandler(req, res, {public: 'www'})
})
const wss = new ws.Server({server})
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
channel.publish('chat', '', Buffer.from(msg)) // (3)
})
// query the history service
superagent // (4)
.get('http://localhost:8090')
.on('error', err => console.error(err))
.pipe(JSONStream.parse('*'))
.on('data', msg => client.send(msg))
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(httpPort)
}
main().catch(err => console.error(err))
正如我们所看到的,AMQP 在这种情况下也使代码变得更加冗长,但此时我们应该已经熟悉了其中的大部分内容。 只需要注意以下几个方面:
-
正如我们所提到的,我们的聊天服务器不需要是持久订阅者:即发即忘范例就足够了。 因此,当我们创建队列时,我们传递 { Exclusive: true } 选项,表明队列的范围仅限于当前连接,因此一旦聊天服务器关闭,它就会被销毁。
-
出于与上一点相同的原因,当我们从队列中读取消息时,不需要发回任何确认。 因此,为了让事情变得更简单,我们在开始使用队列中的消息时传递 { noAck: true } 选项。
-
发布新消息也非常容易。 我们只需指定目标交换(聊天)和路由键,在我们的例子中为空(''),因为我们使用的是扇出交换,因此没有要执行的路由。
-
此版本聊天服务器的另一个特点是,借助我们的历史记录微服务,我们现在可以向用户呈现完整的聊天历史记录。 我们通过查询历史微服务并在建立新连接后立即将每条过去的消息发送到客户端来实现这一点。
我们现在可以运行我们新改进的聊天应用程序。 为此,首先确保 RabbitMQ 在您的计算机上本地运行,然后让我们在三个不同的终端中启动两个聊天服务器和历史记录服务:
node index.js 8080
node index.js 8081
node historySvc.js
我们现在应该把注意力集中在我们的系统,特别是历史服务,在停机时的行为方式。 如果我们停止历史服务器并继续使用聊天应用程序的 Web UI 发送消息,我们将看到当历史服务器重新启动时,它将立即收到所有错过的消息。 这是持久订阅者模式如何工作的完美演示!
有趣的是,微服务方法如何让我们的系统即使没有其组件之一(历史服务)也能生存。 功能会暂时减少(没有可用的聊天历史记录),但人们仍然可以实时交换聊天消息。 惊人的! |
使用流进行可靠的消息传递
在本章开头,我们提到消息队列的可能替代方案是流。 这两种范式在范围上相似,但在消息传递方法上有根本不同。 在本节中,我们将通过利用 Redis Streams 来实现我们的聊天应用程序来揭示流的强大功能。
流媒体平台的特点
在系统集成的上下文中,流(或日志)是一种有序的、仅附加的、持久的数据结构。 消息(在流的上下文中更合适地称为记录)始终添加在流的末尾,并且与队列不同,它们在使用时不会自动删除。 从本质上讲,此特性使流更类似于数据存储而不是消息代理。 与数据存储一样,可以查询流以检索一批过去的记录或从特定记录开始重放。
流的另一个重要特征是消费者从流中提取记录。 这本质上允许消费者按照自己的节奏处理记录,而不会面临不知所措的风险。
基于这些功能,流允许我们实现开箱即用的可靠消息传递,因为流中不会丢失任何数据(即使数据仍然可以显式删除或可以在可选保留期后删除)。 事实上,如图 13.14 所示,如果消费者崩溃,它所要做的就是从中断处开始读取流:
/image-2024-05-08-14-13-38-008.png)
如图 13.14 所示,在正常操作期间 (1),消费者在生产者添加记录后立即处理流中的记录。 当消费者由于问题或计划维护而变得不可用时 (2),生产者只需继续照常向流添加记录即可。 当消费者重新上线 (3) 时,它会从离开的位置开始处理记录。 这种机制的主要方面是它非常简单和准系统,但它非常有效地确保即使消费者不可用时也不会丢失消息。
流与消息队列
正如我们到目前为止所看到的,消息队列和流之间存在很多差异,但也有很多相似之处。 那么,什么时候应该使用其中之一来代替另一个呢?
嗯,流的明显用例是当我们必须处理顺序数据(流数据)时,这可能还需要消费者批量处理消息或查找过去消息中的相关性。 此外,现代流媒体平台允许每秒摄取千兆字节的数据,并跨多个节点分发数据和处理数据。
消息队列和流都非常适合实现简单的发布/订阅模式,即使具有可靠的消息传递也是如此。 然而,消息队列更适合复杂的系统集成任务,因为它们提供了高级的消息路由,并允许我们对不同的消息具有不同的优先级(在流中,记录的顺序始终被保留)。
正如我们稍后将看到的,两者都可以用于实现任务分配模式,尽管在标准架构中,由于消息优先级和更高级的路由机制,消息队列可能更合适。
使用 Redis Streams 实现聊天应用程序
在撰写本文时,最流行的流媒体平台是 Apache Kafka (nodejsdp.link/kafka) 和 Amazon Kinesis (nodejsdp.link/kinesis)。 然而,对于更简单的任务,我们可以再次依赖 Redis,它实现了一种称为 Redis Streams 的日志数据结构。
在下一个代码示例中,我们将通过调整我们的聊天应用程序来了解 Redis Streams 的实际运行情况。 使用流而不是消息队列的直接优势是,我们不需要依赖专用组件来存储和检索聊天室中交换的消息的历史记录,但我们可以在每次需要时简单地查询流 访问较旧的消息。 正如我们将看到的,这大大简化了我们应用程序的架构,并且肯定使流成为比消息队列更好的选择,至少对于我们非常简单的用例来说。
那么,让我们深入研究一些代码。 让我们更新聊天应用程序的 index.js 以使用 Redis Streams:
import {createServer} from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
import Redis from 'ioredis'
const redisClient = new Redis()
const redisClientXRead = new Redis()
// serve static files
const server = createServer((req, res) => {
return staticHandler(req, res, {public: 'www'})
})
const wss = new ws.Server({server})
wss.on('connection', async client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
redisClient.xadd('chat_stream', '*', 'message', msg) // (1)
})
// Load message history
const logs = await redisClient.xrange( // (2)
'chat_stream', '-', '+')
for (const [, [, message]] of logs) {
client.send(message)
}
})
function broadcast(msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
let lastRecordId = '$'
async function processStreamMessages() { // (3)
while (true) {
const [[, records]] = await redisClientXRead.xread(
'BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
for (const [recordId, [, message]] of records) {
console.log(`Message from stream: ${message}`)
broadcast(message)
lastRecordId = recordId
}
}
}
processStreamMessages().catch(err => console.error(err))
server.listen(process.argv[2] || 8080)
与往常一样,应用程序的整体结构保持不变; 改变的是我们用来与应用程序的其他实例交换消息的 API。
让我们更仔细地看看这些 API:
-
我们要分析的第一个命令是 xadd。 此命令将新记录附加到流中,我们使用它在从连接的客户端到达时添加新的聊天消息。 我们向 xadd 传递以下参数:
-
流的名称,在我们的例子中是 chat_stream。
-
记录的 ID。 在我们的例子中,我们提供了一个星号(*),它是一个特殊的ID,要求Redis为我们生成一个ID。 这通常是我们想要的,因为 ID 必须是单调的以保留记录的字典顺序,而 Redis 会为我们处理这个问题。
-
它遵循键值对列表。 在我们的例子中,我们仅指定值 msg 的 'message' 键(这是我们从客户端收到的消息)。
-
-
这是使用流最有趣的方面之一:我们查询流的过去记录以检索聊天历史记录。 每次客户端连接时我们都会这样做。 为此,我们使用 xrange 命令,顾名思义,它允许我们检索流中两个指定 ID 内的所有记录。 在我们的例子中,我们使用特殊的 ID '-'(减号)和 '+'(加号),它们表示可能的最低 ID 和可能的最高 ID。 这本质上意味着我们想要检索当前流中的所有记录。
-
我们的新聊天应用程序的最后一个有趣的部分是等待新记录添加到流中。 这允许每个应用程序实例在添加到队列中时读取新的聊天消息,这是集成工作的重要组成部分。 我们使用无限循环和 xread 命令来执行该任务,并提供以下参数:
-
BLOCK 意味着我们希望调用阻塞,直到新消息到达。
-
接下来,我们指定超时,之后命令将简单地返回空结果。 在我们的例子中,0 意味着我们想要永远等待。
-
STREAMS 是一个关键字,它告诉 Redis 我们现在要指定要读取的流的详细信息。
-
chat_stream 是我们要读取的流的名称。
-
最后,我们提供记录 ID (lastRecordId),之后我们要开始读取新消息。 最初,它被设置为$(美元符号),它是一个特殊的ID,指示当前流中的最高ID,它本质上应该在流中当前的最后一条记录之后开始读取流。 读取第一条记录后,我们使用最后读取的记录的 ID 更新 lastRecordId 变量。
-
在前面的示例中,我们还使用了一些巧妙的解构指令。 例如考虑以下代码:
for (const [, [, message]] of logs) {...}
该指令可以扩展为如下所示:
for (const [recordId, [propertyId, message]] of logs) {...}
但由于我们对获取 recordId 和 propertyId 不感兴趣,因此我们只是将它们排除在解构指令之外。 这种特殊的解构与 for…of 循环相结合,对于解析从 xrange 命令返回的数据是必要的,在我们的例子中,该数据采用以下形式:
[
["1588590110918-0", ["message", "This is a message"]],
["1588590130852-0", ["message", "This is another message"]]
]
我们应用类似的原理来解析 xread 的返回值。 有关其返回值的详细说明,请参阅这些指令的 API 文档。
您可以在 Redis 官方文档(nodejsdp.link/xadd)中阅读有关 xadd 命令和记录 ID 格式的更多信息。 xread 命令还有一个相当复杂的参数列表和返回值,您可以在 nodejsdp.link/xread 上阅读更多信息。 另外,请查看 nodejsdp.link/xrange 上的 xrange 文档。 |
现在,您可以再次启动几个服务器实例并测试应用程序以查看新实现的工作原理。
有趣的是,再次强调这样一个事实:我们不需要依赖专用组件来管理我们的聊天历史记录,相反,我们所需要做的就是使用 xrange 从流中检索过去的记录。 流的这一方面使它们本质上可靠,因为除非显式删除,否则不会丢失消息。
可以使用 xdel (nodejsdp.link/xdel) 或 xtrim 命令 (nodejsdp.link/xtrim) 或使用 xadd 的 MAXLEN 选项 (nodejsdp.link/xadd-maxlen) 从流中删除记录。 |
我们对发布/订阅模式的探索到此结束。 现在,是时候发现消息传递模式的另一个重要类别:任务分配模式。