介绍
点对点消息传递模式是现代 Web 和云架构中常用的通信模型。它旨在实现不同组件(例如无服务器函数或微服务)之间的异步交互,允许它们交换消息而无需立即响应。.
在这种模式下,发送消息的组件称为生产者,接收和处理消息的组件称为消费者。生产者和消费者可以位于同一系统,也可以位于不同的系统中,这使得这种通信方式灵活且可扩展。.
类似于电子邮件的发送方式,消息也是从生产者发送给特定的消费者。这使得即使在复杂的分布式系统中,也能实现高效可靠的通信。这种方式通常用于生产者确切知道哪个消费者应该接收消息,但生产者不需要立即收到响应的场景。.
点对点消息传递模式有效地促进了组件之间的通信和协调,提高了现代 Web 和云架构的整体性能、可靠性和可扩展性。.
我们将建造什么
在本分步教程中,我们将使用两个 AWS Lambda 函数和一个 Amazon SQS 队列来实现此模式。.
在本分步教程中,我们将使用两个 AWS Lambda 函数和一个 Amazon SQS 队列来实现一个简单的示例。.
您将使用 TypeScript 和 AWS 云开发工具包 (AWS CDK) 构建示例。.
该示例将包含三个部分:
- 能够向消费者发送信息的生产者
- 能够接收生产者信息的消费者。
- 一种在生产者和消费者之间建立通信通道的消息队列。
除了实现这种模式之外,我们还将重点介绍 AWS 云开发工具包 (CDK) 在以代码形式定义整个基础设施方面的强大功能。如果您想了解更多关于 AWS CDK 的信息,请参阅 AWS CDK 开发人员指南。.
在本教程结束时,您将全面了解基于队列的点对点消息传递的各个组成部分,成功使用 SQS 实现了两个 Lambda 函数之间的异步通信,并获得了构建基础架构的实践经验。使用 CDK 编写代码
但在开始编写代码之前,让我们快速了解一下异步点对点消息传递模式的优点和缺点。.
异步点对点消息传递模式的优点和缺点
好处
- 松耦合:异步点对点消息传递模式促进了应用程序之间的松耦合,允许它们独立通信而无需紧密集成。这种灵活性使得扩展和修改单个组件更加容易,而不会影响整个系统。.
- 可扩展性:这种模式支持高效的水平扩展,因为可以添加多个消费者应用程序来异步处理工作负载。这使得系统能够更有效地处理大量并发消息和请求。.
- 可靠性:在异步消息传递中,如果消息未送达或未处理,可以重试或将其发送到错误队列以便稍后处理,从而提高系统的可靠性。.
- 容错性:异步消息传递通过将消息的生产者和消费者分离来提供容错性。如果某个应用程序或组件崩溃,消息可以被存储起来,以便在系统恢复在线后进行处理,从而确保数据不会丢失。.
缺点
- 复杂性:与其他集成模式相比,实现异步点对点消息传递模式可能更加复杂,并且需要额外的消息处理逻辑。.
- 消息依赖关系和可能的重复:在异步消息系统中,管理消息之间的依赖关系并确保消息正确删除可能极具挑战性。这需要精心设计和实现,以处理诸如消息排序、消息重复和消息处理依赖关系等潜在问题。.
- 延迟增加:异步消息传递会在发送消息和接收响应之间引入延迟,因为消息处理可能需要更长时间。这种延迟会影响实时交互,可能不适用于需要即时反馈的应用。.
在进行架构决策时,务必权衡各种利弊,并选择最符合您特定需求和约束条件的通信模式。许多现代应用程序依赖于多种集成模式,包括异步点对点消息传递、同步请求-响应通信以及事件驱动通信。.
但现在,让我们开始教程,学习如何使用 AWS Lambda 和 Amazon SQS 来实现这种模式。.
关于编码时资源成本的说明:本教程仅使用最少量的资源,所有这些资源都包含在 AWS 为每个账户创建后的前 12 个月提供的免费套餐中:
- 几千字节的代码将存储在亚马逊 S3 中,该服务提供 5 GB 的免费存储空间。.
- 我们给 SQS 打了几次电话,它每月提供 100 万次免费请求。.
- 我们将调用 AWS Lambda 中的两个函数,AWS Lambda 每月还提供 100 万次免费调用。.
所以,只要你按照步骤操作,就一定能继续使用免费套餐。我还在最后添加了一个章节,帮助你删除本教程中创建的所有资源。.
先决条件
如果您已安装 CLI 和 CDK,则应该可以看到已安装的版本信息。否则,您将收到一条错误消息,提示找不到该命令。.
在开始本教程之前,您需要准备以下物品:
- AWS账户
- AWS 云开发工具包 (AWS CDK)
本教程需要 AWS CLI 和 AWS CDK 的至少 2 版本。.
您可以通过在终端中运行以下命令来确定 AWS CLI 版本:
aws --version
通过运行以下命令获取 AWS CDK 版本:
cdk --version
步骤 1 – 创建 CDK 应用程序
1.1 – 启动 CDK 程序
首先,您需要进行初始设置,让 CDK 自动生成构建项目所需的代码。.
从命令行创建一个项目目录,进入该目录,然后初始化项目:
mkdir point-to-point-example
cd point-to-point-example
cdk init app --language typescript
这将创建项目的主要文件结构并安装所需的软件包。.
现在是时候在您最喜欢的 IDE 中打开项目,查看生成的项目,特别是以下两个文件:
point-to-point-example/
├── bin/
│ └── point-to-point-example.ts
├── lib/
│ └── point-to-point-example-stack.ts
└── ...
文件 bin\point-to-point-example.ts 这是 CDK 程序的入口点。在本教程中,我们无需再次检查此文件。.
文件 lib\point-to-point-example-stack.ts 程序的主堆栈在这里定义。我们大部分时间都花在这个文件上。.
在本教程的后续部分,我们将向全班同学介绍我们应用程序的基础架构。 点对点示例堆栈 我们将在这个文件中添加内容。请在编辑器中打开它并查看:
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
// import * as sqs from 'aws-cdk-lib/aws-sqs';
export class PointToPointExampleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// The code that defines your stack goes here
// ...
}
}
1.2 – 引导环境
首次将 AWS CDK 应用程序部署到特定环境(即特定的 AWS 账户和区域)时,需要使用 CDKToolkit 堆栈启动该环境。此堆栈会部署一个 S3 存储桶,用于在实际应用程序部署过程中存储模板和资源。.
要启动环境,请确保您仍在示例点对点目录中,然后运行以下命令:
cdk bootstrap
这需要一些时间,当您在命令行看到以下输出时,即表示操作完成:
✅ Environment aws://[account_id]/[region] bootstrapped.
1.3 – 部署初始堆栈
要部署该堆栈,请确保您仍在示例点对点目录中,然后运行以下命令:
cdk deploy
此命令将应用程序编译成 AWS CloudFormation 模板并将其部署到您的 AWS 账户。.
现在打开堆栈详细信息并进行查看。在“资源”选项卡中,您会看到除了名为 CDKMetadata 的资源之外,它尚未部署任何内容,而 CDKMetadata 只是我们堆栈的一些初始配置详细信息。.
步骤 2 – 创建 SQS 队列
一切设置完毕后,您就可以创建您的第一个真正资源,它将是一个 SQS 队列。.
2.1 – 将队列添加到栈中
首先,请提交文件 lib\point-to-point-example-stack.ts 打开。.
根据 CDK 模板的版本,它可能已经在注释中添加了一些导入语句,包括 `aws-cdk-lib/aws-sqs`。如果是这样,请删除 `import * as sqs from 'aws-cdk-lib/aws-sqs'` 前面的注释符号 `//`;否则,请自行添加,使文件的前三行如下所示:
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sqs from 'aws-cdk-lib/aws-sqs';
现在是课堂讨论时间。 点对点示例堆栈 删除并替换以下代码,然后保存更改:
const queue = new sqs.Queue(this, 'Queue', {
queueName: 'MyQueue',
});
2.2 – 部署队列
要部署队列,请在项目目录中运行以下命令:
cdk deploy
现在您可以在终端输出中监控部署过程,一两分钟后应该就会完成。.
部署完成后,转到 AWS 管理控制台中的 SQS 队列列表,找到名为 SqsTutorialStack-Queue 的队列:
2.3 – 检查已部署队列
请前往 AWS 管理控制台中的 SQS 控制面板,查看队列列表。如果找不到您的队列,请确保您位于正确的区域。.

步骤 3 – 创建生产者函数并将其连接到队列
3.1 – 编写生成器函数的源代码
我们将使用 JavaScript 编写 Lambda 函数,并将源代码存储在 CDK 应用程序内的独立文件夹中。稍后,我们将从 CDK 堆栈中引用此文件夹,以便它可以自动打包、上传并将内容部署到 AWS。.
请确保您位于项目目录中,并创建以下目录和文件本身:
mkdir lambda-src
mkdir lambda-src/producer
touch lambda-src/producer/send_message_to_sqs.js
您的项目目录结构现在应该如下所示:
point-to-point-example/
├── bin/
│ └── point-to-point-example.ts
├── lambda-src/
│ └── producer/
│ └── send_message_to_sqs.js
├── lib/
│ └── point-to-point-example-stack.ts
└── ...
已创建新文件。 lambda-src/producer/send_message_to_sqs.js 在编辑器中打开该文件,并添加以下代码:
const sqs = require("@aws-sdk/client-sqs");
// Create an Amazon SQS service client
const client = new sqs.SQSClient();
exports.handler = async (event, context) => {
// Create a message, including the function's name from
// its execution context
const myMessage = 'Hello World from ' + context.functionName
// Get the queue URL from the execution environment
const queueUrl = process.env.SQSQueueUrl
// Create a SendMessageCommand containing the message
// and the queue URL
const command = new sqs.SendMessageCommand({
MessageBody: JSON.stringify({ message: myMessage }),
QueueUrl: queueUrl
});
// Send the message and return the result
const result = await client.send(command);
return result;
}
这是一个简单的 Lambda 函数处理程序,它会向我们的 SQS 队列发送消息并返回结果。.
请注意,由于我们在构建此函数时不一定知道要使用哪个队列,或者我们可能以后想要更改队列而无需修改函数代码,因此我们会从运行时获取队列 URL。这样,我们就可以在将 Lambda 函数部署到 CDK 堆栈期间动态定义它。.
3.2 – 将生产者函数添加到堆栈中
要在 AWS 中创建真正的 Lambda 函数,需要将其添加到堆栈中。为此,请打开 lib/point-to-point-example-stack.ts 文件。.
首先,在文件顶部添加另一个导入语句:
import * as lambda from 'aws-cdk-lib/aws-lambda';
然后在构造函数中,队列下方添加以下代码片段:
const producerFunction = new lambda.Function(this, 'Producer', {
functionName: 'Producer',
runtime: lambda.Runtime.NODEJS_18_X,
code: lambda.Code.fromAsset('lambda-src/producer'),
handler: 'send_message_to_sqs.handler',
environment: {
SQSQueueUrl: queue.queueUrl,
}
});
这将创建一个函数、包以及 lambda-src/producer 目录的内容,并将该函数配置为在调用时使用 send_message_to_sqs.js 中定义的处理程序。.
它还会动态地将 SQSQueueUrl 环境变量设置为 SQS 队列的实际 URL,以便可以使用 process.env.SQSQueueUrl 从函数内部访问它。.
3.3 – 制作人表演的许可
每个 Lambda 函数都需要一些基本的 IAM 权限,CDK 默认使用基本执行角色提供这些权限。.
然而,我们的生产者函数需要额外的权限才能向队列发送消息。幸运的是,在 CDK 中,我们无需手动管理 IAM 策略。我们可以将 SQS 队列的 grantSendMessages 方法与我们的 Lambda 函数配合使用。.
为此,请在 producerFunction 下方添加以下代码行:
queue.grantSendMessages(producerFunction);
3.4 – 部署生产者函数
要部署该函数及其源代码,请在项目目录中运行以下命令:
cdk deploy
这样就为部署任何新的或修改过的资源做好了准备。.
如果您的堆栈发生任何更改需要创建或修改 IAM 权限,CDK 将提示您出于安全原因重新审核这些更改。.
下图显示,CDK 希望进行三项影响 IAM 权限的更改:
- 允许 Lambda 服务 (lambda.amazon.aws) 承担附加到 Producer 函数的执行角色。.
- 直接添加必要的权限,允许执行角色访问 SQS 队列并向其发送消息。.
- 向执行角色添加具有初始权限的托管策略。.
输入 y 进行确认,CDK 将部署新的 Lambda 函数及其 IAM 角色。.
3.5 – 测试生产者的表现
运行函数后,转到 AWS Lambda 控制面板中的函数列表。.
函数名与我们在堆栈中定义的名称相同:Producer。如果找不到该函数,请确保您位于正确的区域。.
点击函数名称即可打开其详细信息并进入源代码部分。.
左侧的文件资源管理器文件夹中包含一个名为 send_message_to_sqs.js 的文件。.
双击文件名打开其源代码,可以看到它是我们在 CDK 项目的 lambda-src/producer 文件夹中创建的文件的精确副本。.
现在我们来配置一个测试事件。.
为此,请点击“测试”按钮。如果您尚未配置测试事件,这将打开“测试配置”页面。输入事件名称,例如“test-event”,然后点击“保存”。.
测试事件配置完成后,再次单击“测试”,这将直接调用 Lambda 函数。.
这将创建一个包含响应的“执行结果”选项卡:
{
"$metadata": {
"httpStatusCode": 200,
"requestId": "cb033b2a-1234-5678-9012-66188359d280",
"attempts": 1,
"totalRetryDelay": 0
},
"MD5OfMessageBody": "b5357af0c1c816b2d41275537cc0d1af",
"MessageId": "4a76e538-1234-5678-9012-749f0f4b9294"
}
还记得 Lambda 函数中的那段代码吗?
const result = await client.send(command);
return result;
上述执行结果中的 JSON 对象正是 SQS 对 client.send(command) 返回的响应,其中包含诸如“httpStatusCode”: 200 之类的有趣信息,这意味着队列已成功接收消息。.
3.6 – 手动从 SQS 检索消息
目前队列尚未连接到消费者,因此消息不会被接收。但是,我们可以手动检索该消息。.
进入 SQS 控制面板,点击队列打开其详细信息页面,然后点击“发送和接收消息”按钮。.
在下一页中,转到“接收消息”部分,然后单击“轮询消息”按钮,该按钮应显示当前正在发送的所有消息,包括您在测试构建器函数时发送的消息:
点击邮件 ID 即可打开邮件。邮件正文应包含以下内容:
{"message":"Hello World from Producer"}
恭喜!您已成功部署并测试了一个向 SQS 队列发送消息的 Lambda 函数。.
接下来,我们创建消费者函数,并将其配置为每当队列中到达新消息时自动调用。.
步骤 4 – 创建消费者函数并将其连接到队列
4.1 – 编写消费者函数的源代码
消费者函数代码存储在 lambda-src 内部的单独文件夹中,以便 CDK 可以自动将其打包并上传到新的 Lambda 函数中。.
请确保您位于项目目录(而不是 bin/ 或 lib/ 目录)中,并使用源文件创建以下目录:
mkdir lambda-src/consumer
touch lambda-src/consumer/receive_message_from_sqs.js
在编辑器中打开 lambda-src/consumer/receive_message_from_sqs.js 文件,并输入以下代码:
exports.handler = async (event) => {
// Retrieve the message from the event and log it to the console
const message = event.Records[0].body;
console.log(message);
}
这是一个非常简单的 Lambda 函数处理程序,它打开 SQS 消息并将其打印到报表中。.
4.2 – 将 Consumer 函数添加到堆栈中
要在 AWS 中创建真正的 Lambda 函数,需要将其添加到堆栈中。为此,请打开 lib/point-to-point-example-stack.ts 文件。.
在生产者函数中,在 queue.grantSendMessages(producerFunction) 下方添加以下代码片段:
const consumerFunction = new lambda.Function(this, 'Consumer', {
functionName: 'Consumer',
runtime: lambda.Runtime.NODEJS_18_X,
code: lambda.Code.fromAsset('lambda-src/consumer'),
handler: 'receive_message_from_sqs.handler'
});
此消费者函数创建 lambda-src/consumer 目录的包和内容,并配置该函数在调用时使用 receive_message_from_sqs.js 中定义的处理程序。.
4.3 – 将 SQS 队列作为事件源添加到消费者中
我们希望当队列中有新消息时触发此函数。为此,我们需要将队列添加为该函数的事件源,并授予该函数消费消息所需的所有权限。.
首先,在文件顶部添加另一个导入语句:
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
接下来,在 customersFunction 定义下方添加以下两行:
consumerFunction.addEventSource(new SqsEventSource(queue));
queue.grantConsumeMessages(consumerFunction);
4.4 – 部署消费者功能
要部署该函数及其源代码,请在项目目录中运行以下命令:
cdk deploy
这样就为部署任何新的或修改过的资源做好了准备,并且再次提示您重新验证新的 IAM 权限。.
输入 y 进行确认,CDK 将执行新的 Lambda 函数及其执行角色,并将其附加到 SQS 队列。.
部署完成后,一切都将准备就绪并连接完毕。让我们看看它是否有效。
步骤 5 – 应用程序的端到端测试
为了测试应用程序,我们手动调用生产者函数向队列发送另一条消息。然后,我们检查消费者函数的执行日志,看看它是否自动启动并处理了该消息。.
5.1 – 调用生产者函数
按照 3.5 节“测试生产者函数”中提到的步骤再次调用生产者函数。以下是简要概述:
- 转到 AWS Lambda 控制面板中的函数列表。.
- 点击名称打开生产者函数。.
- 转到源代码部分。.
- 点击“测试”按钮。.
- 确保函数运行成功。.
5.2 – 查看消费者绩效报告
返回 AWS Lambda 控制面板中的函数列表,这次打开 Consumer 函数。.
向下滚动,打开名为“监视器”的选项卡,在该选项卡中,打开“日志”选项卡,查看“最近调用”,您应该可以在那里找到指向该函数的最后一个 LogStream 的链接。.
之后,点击“日志流”链接。这将显示日志条目本身,如果您不习惯这种格式,可能会觉得有点令人困惑。.
只需查找位于这两个条目中间的条目,该条目以 START RequestId: … 和 END Request ID: … 开头。.
点击箭头展开条目,即可找到原始消息:
结果
恭喜!您已经创建了使用 SQS 在两个 Lambda 函数之间设置异步点对点消息传递所需的一切。.
SQS 还有更多功能,例如消息分类、确保消息顺序以及自动将错误消息发送到死信队列 (DLQ)。如需了解更多信息,请参阅 Amazon SQS 开发人员指南。

















