介绍
发布/订阅模式是一种用途广泛的单向消息传递模式,其中发布者生成数据/消息,订阅者订阅以接收特定类型的消息。它可以使用点对点架构或消息代理来实现通信。.
上图展示了点对点发布/订阅模型,其中发布者直接向订阅者发送消息,无需中间机构。订阅者需要知道发布者的地址或端点才能接收消息。.
在上图中,发布/订阅模型使用消息代理作为中心枢纽,用于在发布者和订阅者之间发送消息。代理负责协调消息交换,并将消息从发布者分发给订阅者。订阅者节点直接订阅代理,而不是发布者。代理的存在提高了系统节点的隔离性,因为发布者和订阅者都只与代理交互。在本教程中,您将构建一个实时聊天应用程序来进一步演示这种模式。.
先决条件
- 您的操作系统上已安装 Node.js(版本 >= 12)。.
- 类似 VSCode 的代码编辑器
- 您的机器上已安装 Redis。
- 具备HTML、DOM、VanillaJS和WebSocket的基本知识。.
步骤 1 – 服务器端实现
要开始运行服务器端,我们使用初始化命令初始化一个基本的Nodejs应用程序:
npm init -y上述命令会创建一个默认的 package.json 文件。.
接下来,我们安装 WebSocket (ws) 依赖包,这是整个构建过程中所必需的:
npm install ws服务器端实现将是一个简单的服务器端聊天应用程序。我们将遵循以下工作流程:
- 搭建服务器
- 读取 HTML 文件以在浏览器中呈现它
- 建立WebSocket连接。.
服务器设置
在您的目录中创建一个名为 app.js 的文件,并将以下代码放入其中:
const http = require("http");
const server = http.createServer((req, res) => {
res.end("Hello Chat App");
});
const PORT = 3459;
server.listen(PORT, () => {
console.log(`Server up and running on port ${PORT}`);
});方法 创建服务器 在模块中 http 国内的 Node.js 它将用于启动服务器。服务器监听请求的端口已设置,并且已创建的服务器实例的 listen 方法将被调用,以监听指定端口上的传入请求。.
命令 node app.js 在终端中运行此命令,您应该会得到类似这样的响应:
OutputServer is up and running on port 3459如果你在浏览器中请求这个端口,你应该会收到类似这样的响应:

读取 HTML 文件以在浏览器中呈现它
在主文件夹中创建一个名为 index.html 的文件,并将以下代码复制到该文件中:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Document</title>
</head>
<body>
<p>Serving HTML file</p>
</body>
</html>这是一个基本的 HTML 文件,用于渲染“Hello”。现在,我们需要读取这个文件,并在每次向我们的服务器发送 HTTP 请求时,将其作为响应进行渲染。.
// app.js
const server = http.createServer((req, res) => {
const htmlFilePath = path.join(__dirname, "index.html");
fs.readFile(htmlFilePath, (err, data) => {
if (err) {
res.writeHead(500);
res.end("Error occured while reading file");
}
res.writeHead(200, { "Content-Type": "text/html" });
res.end(data);
});
});这里,我们使用内部路径模块和函数。 加入 我们用它来连接路径段。然后我们使用函数 读取文件 读取文件 index.html 异步使用。需要两个参数:要读取的文件路径和回读操作。状态码 500 发送响应头和错误消息至客户端。如果数据读取成功,则返回成功状态码。 200 我们向客户端发送响应头和响应数据,在本例中即为文件内容。如果未指定编码(例如 UTF-8 编码),则返回原始缓冲区。否则,返回文件内容。 HTML 已退回。.
在浏览器中向服务器发出请求,你应该会看到以下内容:

建立 WebSocket 连接
// app.js
const webSocketServer = new WebSocket.Server({ server });
webSocketServer.on("connection", (client) => {
console.log("successfully connected to the client");
client.on("message", (streamMessage) => {
console.log("message", streamMessage);
distributeClientMessages(streamMessage);
});
});
const distributeClientMessages = (message) => {
for (const client of webSocketServer.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
}
};在上一行代码中,我们创建了一个名为 `webSocket.server` 的新 WebSocket 服务器。 webSocket服务器 我们创建它并将其发送到服务器。 HTTP 我们连接到现有的服务器。这样我们就可以在单个端口 3459 上处理标准的 HTTP 请求和 WebSocket 连接。.
当 WebSocket 连接成功建立时,会触发 on() 连接事件。函数中的客户端 打回来 WebSocket 连接对象代表与客户端的连接。它用于发送和接收消息,以及监听客户端消息等事件。.
这里使用 distributeClientMessages 函数将传入的消息发送给所有已连接的客户端。它接受一个消息参数,并遍历连接到服务器的客户端。然后,它检查每个客户端的连接状态(readyState === WebSocket.OPEN)。这是为了确保服务器只向已建立连接的客户端发送消息。如果客户端连接已打开,服务器将使用 client.send(message) 方法向该客户端发送消息。.
步骤 2 – 客户端实现
要运行客户端,该文件 index.html 我们稍微改变了一下自己。.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Document</title>
</head>
<body>
<p>Pub/Sub Pattern with Chat Messaging</p>
<div id="messageContainer"></div>
<form id="messageForm">
<form id="messageForm">
<input
type="text"
id="messageText"
placeholder="Send a message"
style="
padding: 10px;
margin: 5px;
border-radius: 5px;
border: 1px solid #ccc;
outline: none;
"
onfocus="this.style.borderColor='#007bff';"
onblur="this.style.borderColor='#ccc';"
/>
<input
type="button"
value="Send Message"
style="
padding: 10px;
margin: 5px;
border-radius: 5px;
background-color: #007bff;
color: white;
border: none;
cursor: pointer;
"
onmouseover="this.style.backgroundColor='#0056b3';"
onmouseout="this.style.backgroundColor='#007bff';"
/>
</form>
</form>
<script>
const url = window.location.host;
const socket = new WebSocket(`ws://${url}`);
</script>
</body>
</html>在这段代码片段中,我们添加了一个表单元素,其中包含一个输入框和一个用于发送消息的按钮。WebSocket 连接由客户端发起。为了与我们预先设置的 WebSocket 服务器通信,我们需要创建一个 WebSocket 对象实例,并指定 ws://url,该 URL 指向我们要使用的服务器。登录后,URL 变量将包含连接到 3459 端口的 URL,我们的服务器正在监听该端口。.
// app.js
console.log("url", url); // localhost:3459
console.log("socket", socket); // { url: "ws://localhost:3459/", readyState: 0, bufferedAmount: 0, onopen: null, onerror: null, onclose: null, extensions: "", protocol: "", onmessage: null, binaryType: "blob" }所以,当你在浏览器中向服务器发出请求时,你应该会看到这样的界面:
让我们升级一下脚本,以便能够从客户端向服务器发送消息并从服务器接收消息。.
// index.html
<script>
const url = window.location.host;
const socket = new WebSocket(`ws://${url}`);
const messageContainer = document.getElementById("messageContainer");
socket.onmessage = function (eventMessage) {
eventMessage.data.text().then((text) => {
const messageContent = document.createElement("p");
messageContent.innerHTML = text;
document
.getElementById("messageContainer")
.appendChild(messageContent);
});
};
const form = document.getElementById("messageForm");
form.addEventListener("submit", (event) => {
event.preventDefault();
const message = document.getElementById("messageText").value;
socket.send(message);
document.getElementById("messageText").value = "";
});
</script>如前所述,我们从客户端(浏览器)获取向我们服务器发送请求的 URL,并使用该 URL 创建一个新的 WebSocket 对象实例。然后,点击按钮 发送消息 我们在表单元素上创建一个事件。用户在用户界面中输入的文本将从输入元素中提取出来,并调用套接字实例的 send 方法将消息发送到服务器。.
事件 onmessage 此方法在套接字对象上调用,当从服务器收到消息时触发。它用于更新接收到的消息的用户界面。参数 事件消息 功能 打回来 它包含服务器发送的数据(消息),但以 Blob 对象的形式返回。然后对 Blob 数据使用 text() 方法,该方法返回一个 Promise 对象,并使用 then() 方法解析该 Promise 对象,从而从服务器获取实际文本。.
让我们测试一下我们现有的功能。运行以下命令启动服务器
node app.js然后,打开两个不同的浏览器标签页, http://localhost:3459/ 打开标签页,尝试在标签页之间发送消息进行测试:

步骤 3 – 扩展应用程序
假设我们的应用程序开始发展壮大,我们尝试通过部署多个聊天服务器实例来扩展其规模。我们希望实现的是,连接到两个不同服务器的两个用户能够成功地互相发送文本消息。目前我们只有一个服务器,如果我们请求另一个服务器,例如…… http://localhost:3460/服务器在端口上发送消息 3459 我们不会有。也就是说,只有连接到的用户才会有。 3460 他们可以和自己聊天。目前的实现方式是,当聊天消息在我们运行的服务器实例上发送时,该消息只会在本地分发给连接到该特定服务器的客户端,如示例所示 http://localhost:3459/ 我们在两个不同的浏览器上打开它。现在,让我们看看如何集成两个不同的服务器,使它们能够相互通信。.
步骤 4 – 将 Redis 用作消息代理
Redis 是一种快速灵活的内存数据结构存储系统。它通常用作数据库或缓存服务器来存储数据。此外,它还可以用于实现集中式的发布/订阅消息交换模式。Redis 的速度和灵活性使其成为分布式系统中数据共享的热门选择。.
我们的目标是使用 Redis 作为消息代理来集成我们的聊天服务器。每个服务器实例都会同时将从客户端(浏览器)接收到的每条消息发布到消息代理。消息代理会订阅从服务器实例发送的每条消息。.
我们来归档吧 app.js 让我们改变自己:
//app.js
const http = require("http");
const fs = require("fs");
const path = require("path");
const WebSocket = require("ws");
const Redis = require("ioredis");
const redisPublisher = new Redis();
const redisSubscriber = new Redis();
const server = http.createServer((req, res) => {
const htmlFilePath = path.join(__dirname, "index.html");
fs.readFile(htmlFilePath, (err, data) => {
if (err) {
res.writeHead(500);
res.end("Error occured while reading file");
}
res.writeHead(200, { "Content-Type": "text/html" });
res.end(data);
});
});
const webSocketServer = new WebSocket.Server({ server });
webSocketServer.on("connection", (client) => {
console.log("succesfully connected to the client");
client.on("message", (streamMessage) => {
redisPublisher.publish("chat_messages", streamMessage);
});
});
redisSubscriber.subscribe("chat_messages");
console.log("sub", redisSubscriber.subscribe("messages"));
redisSubscriber.on("message", (channel, message) => {
console.log("redis", channel, message);
for (const client of webSocketServer.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
}
});
const PORT = process.argv[2] || 3459;
server.listen(PORT, () => {
console.log(`Server up and running on port ${PORT}`);
});在这里,我们使用发布/共享功能。 Redis 我们使用两个不同的连接实例,一个用于发布消息,另一个用于订阅频道。当客户端发送消息时,消息会通过 redisPublisher 实例上的 publisher 方法发布到名为 redisPublisher 的 Redis 频道。 ""聊天消息"" 我们发布消息。调用 redisSubscribe 实例的 subscribe 方法来订阅与 chat_message 相同的频道。每当有消息发布到该频道时,redisSubscriber.on 事件监听器就会被触发。该事件监听器会在所有当前连接的 WebSocket 客户端上重复触发,并将接收到的消息发送给每个客户端。这样做是为了确保当用户发送消息时,连接到每个服务器实例的所有其他用户都能实时收到该消息。.
如果您要设置两个不同的服务器,例如:
node app.js 3459
node app.js 3460当聊天文本在实例上发送时,我们现在可以将消息广播到所有连接的服务器,而不仅仅是单个服务器。您可以通过运行以下命令来实现这一点: http://localhost:3459/ 和 http://localhost:3460/ 进行测试,然后在两个服务器之间发送聊天记录,并查看消息在两个服务器上的实时广播情况。.
您可以从以下位置查看频道中发布的消息 redis-cli 请关注并订阅该频道,以便定期接收消息:
命令 redis-cli 运行。然后进入 监视器 返回浏览器并开始聊天。假设你发送的是魔兽世界聊天信息,那么在你的终端屏幕上应该会看到类似这样的内容:
要查看已共享的已发布消息,请使用相同的命令。 redis-cli 跑 订阅频道名称 输入频道名称,了解我们。 聊天信息 如果你发送短信,你的终端应该会显示类似这样的内容:在浏览器中使用也很棒:
现在,我们可以在不同的端口甚至不同的机器上运行服务器的多个实例,只要它们订阅同一个 Redis 通道,它们就可以接收消息并向所有连接的客户端广播消息,从而确保用户可以在不同实例之间无缝聊天。.
还记得引言部分我们讨论过如何使用消息代理来实现发布/订阅模式吗?这个例子完美地概括了这一点。.
上图中,两个不同的客户端(浏览器)连接到聊天服务器。聊天服务器并非直接连接,而是通过 Redis 实例连接。这意味着,虽然它们独立处理客户端连接,但它们通过公共介质(Redis)共享信息(聊天消息)。每个聊天服务器都连接到 Redis。此连接用于向 Redis 发布消息以及订阅 Redis 通道以接收消息。当用户发送消息时,聊天服务器会将其发布到 Redis 中指定的通道。.
当 Redis 收到一条已发布的消息时,它会将消息广播给所有参与的聊天服务器。然后,每个聊天服务器会将消息发送给所有已连接的客户端,从而确保每个用户都能收到其他用户发送的消息,无论他们连接到哪个服务器。.
这种架构允许我们根据需要添加更多服务器实例,从而实现聊天应用程序的横向扩展。得益于 Redis 发布/订阅系统的功能,该系统确保消息在所有实例之间均匀分发,因此每个实例都可以管理自己的一组已连接客户端。这种设置能够高效地处理大量并发用户,并确保应用程序的可用性。.
结果
在本教程中,我们学习了发布/订阅模式,并使用 Redis 作为消息代理创建了一个简单的聊天应用程序来演示该模式。下一步是学习如何在消息代理并非最佳解决方案的情况下实现点对点消息系统,例如在复杂的分布式系统中,单点故障(消息代理)是不可接受的。.


















