使用 Amazon SQS 进行点对点消息传递

0 股票
0
0
0
0

介绍

点对点消息传递模式是现代 Web 和云架构中常用的通信模型。它旨在实现不同组件(例如无服务器函数或微服务)之间的异步交互,允许它们交换消息而无需立即响应。.

在这种模式下,发送消息的组件称为生产者,接收和处理消息的组件称为消费者。生产者和消费者可以位于同一系统,也可以位于不同的系统中,这使得这种通信方式灵活且可扩展。.

类似于电子邮件的发送方式,消息也是从生产者发送给特定的消费者。这使得即使在复杂的分布式系统中,也能实现高效可靠的通信。这种方式通常用于生产者确切知道哪个消费者应该接收消息,但生产者不需要立即收到响应的场景。.

点对点消息传递模式有效地促进了组件之间的通信和协调,提高了现代 Web 和云架构的整体性能、可靠性和可扩展性。.

我们将建造什么

在本分步教程中,我们将使用两个 AWS Lambda 函数和一个 Amazon SQS 队列来实现此模式。.

在本分步教程中,我们将使用两个 AWS Lambda 函数和一个 Amazon SQS 队列来实现一个简单的示例。.

您将使用 TypeScript 和 AWS 云开发工具包 (AWS CDK) 构建示例。.

该示例将包含三个部分:
  • 能够向消费者发送信息的生产者
  • 能够接收生产者信息的消费者。
  • 一种在生产者和消费者之间建立通信通道的消息队列。


除了实现这种模式之外,我们还将重点介绍 AWS 云开发工具包 (CDK) 在以代码形式定义整个基础设施方面的强大功能。如果您想了解更多关于 AWS CDK 的信息,请参阅 AWS CDK 开发人员指南。.

在本教程结束时,您将全面了解基于队列的点对点消息传递的各个组成部分,成功使用 SQS 实现了两个 Lambda 函数之间的异步通信,并获得了构建基础架构的实践经验。使用 CDK 编写代码

但在开始编写代码之前,让我们快速了解一下异步点对点消息传递模式的优点和缺点。.

异步点对点消息传递模式的优点和缺点

好处
  • 松耦合:异步点对点消息传递模式促进了应用程序之间的松耦合,允许它们独立通信而无需紧密集成。这种灵活性使得扩展和修改单个组件更加容易,而不会影响整个系统。.
  • 可扩展性:这种模式支持高效的水平扩展,因为可以添加多个消费者应用程序来异步处理工作负载。这使得系统能够更有效地处理大量并发消息和请求。.
  • 可靠性:在异步消息传递中,如果消息未送达或未处理,可以重试或将其发送到错误队列以便稍后处理,从而提高系统的可靠性。.
  • 容错性:异步消息传递通过将消息的生产者和消费者分离来提供容错性。如果某个应用程序或组件崩溃,消息可以被存储起来,以便在系统恢复在线后进行处理,从而确保数据不会丢失。.
缺点
  • 复杂性:与其他集成模式相比,实现异步点对点消息传递模式可能更加复杂,并且需要额外的消息处理逻辑。.
  • 消息依赖关系和可能的重复:在异步消息系统中,管理消息之间的依赖关系并确保消息正确删除可能极具挑战性。这需要精心设计和实现,以处理诸如消息排序、消息重复和消息处理依赖关系等潜在问题。.
  • 延迟增加:异步消息传递会在发送消息和接收响应之间引入延迟,因为消息处理可能需要更长时间。这种延迟会影响实时交互,可能不适用于需要即时反馈的应用。.

在进行架构决策时,务必权衡各种利弊,并选择最符合您特定需求和约束条件的通信模式。许多现代应用程序依赖于多种集成模式,包括异步点对点消息传递、同步请求-响应通信以及事件驱动通信。.

但现在,让我们开始教程,学习如何使用 AWS Lambda 和 Amazon SQS 来实现这种模式。.

关于编码时资源成本的说明:本教程仅使用最少量的资源,所有这些资源都包含在 AWS 为每个账户创建后的前 12 个月提供的免费套餐中:

  • 几千字节的代码将存储在亚马逊 S3 中,该服务提供 5 GB 的免费存储空间。.
  • 我们给 SQS 打了几次电话,它每月提供 100 万次免费请求。.
  • 我们将调用 AWS Lambda 中的两个函数,AWS Lambda 每月还提供 100 万次免费调用。.

所以,只要你按照步骤操作,就一定能继续使用免费套餐。我还在最后添加了一个章节,帮助你删除本教程中创建的所有资源。.

先决条件

如果您已安装 CLI 和 CDK,则应该可以看到已安装的版本信息。否则,您将收到一条错误消息,提示找不到该命令。.

在开始本教程之前,您需要准备以下物品:

  • AWS账户
  • AWS 云开发工具包 (AWS CDK)

本教程需要 AWS CLI 和 AWS CDK 的至少 2 版本。.

您可以通过在终端中运行以下命令来确定 AWS CLI 版本:

aws --version

通过运行以下命令获取 AWS CDK 版本:

cdk --version

步骤 1 – 创建 CDK 应用程序

1.1 – 启动 CDK 程序

首先,您需要进行初始设置,让 CDK 自动生成构建项目所需的代码。.

从命令行创建一个项目目录,进入该目录,然后初始化项目:

mkdir point-to-point-example
cd point-to-point-example
cdk init app --language typescript

这将创建项目的主要文件结构并安装所需的软件包。.

现在是时候在您最喜欢的 IDE 中打开项目,查看生成的项目,特别是以下两个文件:

point-to-point-example/
├── bin/
│ └── point-to-point-example.ts
├── lib/
│ └── point-to-point-example-stack.ts
└── ...

文件 bin\point-to-point-example.ts 这是 CDK 程序的入口点。在本教程中,我们无需再次检查此文件。.

文件 lib\point-to-point-example-stack.ts 程序的主堆栈在这里定义。我们大部分时间都花在这个文件上。.

在本教程的后续部分,我们将向全班同学介绍我们应用程序的基础架构。 点对点示例堆栈 我们将在这个文件中添加内容。请在编辑器中打开它并查看:

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
// import * as sqs from 'aws-cdk-lib/aws-sqs';
export class PointToPointExampleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// The code that defines your stack goes here
// ...
}
}
1.2 – 引导环境

首次将 AWS CDK 应用程序部署到特定环境(即特定的 AWS 账户和区域)时,需要使用 CDKToolkit 堆栈启动该环境。此堆栈会部署一个 S3 存储桶,用于在实际应用程序部署过程中存储模板和资源。.

要启动环境,请确保您仍在示例点对点目录中,然后运行以下命令:

cdk bootstrap

这需要一些时间,当您在命令行看到以下输出时,即表示操作完成:

✅ Environment aws://[account_id]/[region] bootstrapped.
1.3 – 部署初始堆栈

要部署该堆栈,请确保您仍在示例点对点目录中,然后运行以下命令:

cdk deploy

此命令将应用程序编译成 AWS CloudFormation 模板并将其部署到您的 AWS 账户。.


现在打开堆栈详细信息并进行查看。在“资源”选项卡中,您会看到除了名为 CDKMetadata 的资源之外,它尚未部署任何内容,而 CDKMetadata 只是我们堆栈的一些初始配置详细信息。.

步骤 2 – 创建 SQS 队列

一切设置完毕后,您就可以创建您的第一个真正资源,它将是一个 SQS 队列。.

2.1 – 将队列添加到栈中

首先,请提交文件 lib\point-to-point-example-stack.ts 打开。.

根据 CDK 模板的版本,它可能已经在注释中添加了一些导入语句,包括 `aws-cdk-lib/aws-sqs`。如果是这样,请删除 `import * as sqs from 'aws-cdk-lib/aws-sqs'` 前面的注释符号 `//`;否则,请自行添加,使文件的前三行如下所示:

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sqs from 'aws-cdk-lib/aws-sqs';

现在是课堂讨论时间。 点对点示例堆栈 删除并替换以下代码,然后保存更改:

const queue = new sqs.Queue(this, 'Queue', {
queueName: 'MyQueue',
});
2.2 – 部署队列

要部署队列,请在项目目录中运行以下命令:

cdk deploy

现在您可以在终端输出中监控部署过程,一两分钟后应该就会完成。.

部署完成后,转到 AWS 管理控制台中的 SQS 队列列表,找到名为 SqsTutorialStack-Queue 的队列:

2.3 – 检查已部署队列

请前往 AWS 管理控制台中的 SQS 控制面板,查看队列列表。如果找不到您的队列,请确保您位于正确的区域。.


步骤 3 – 创建生产者函数并将其连接到队列

3.1 – 编写生成器函数的源代码

我们将使用 JavaScript 编写 Lambda 函数,并将源代码存储在 CDK 应用程序内的独立文件夹中。稍后,我们将从 CDK 堆栈中引用此文件夹,以便它可以自动打包、上传并将内容部署到 AWS。.

请确保您位于项目目录中,并创建以下目录和文件本身:

mkdir lambda-src
mkdir lambda-src/producer
touch lambda-src/producer/send_message_to_sqs.js

您的项目目录结构现在应该如下所示:

point-to-point-example/
├── bin/
│ └── point-to-point-example.ts
├── lambda-src/
│ └── producer/
│ └── send_message_to_sqs.js
├── lib/
│ └── point-to-point-example-stack.ts
└── ...

已创建新文件。 lambda-src/producer/send_message_to_sqs.js 在编辑器中打开该文件,并添加以下代码:

const sqs = require("@aws-sdk/client-sqs");
// Create an Amazon SQS service client
const client = new sqs.SQSClient();
exports.handler = async (event, context) => {
// Create a message, including the function's name from
// its execution context
const myMessage = 'Hello World from ' + context.functionName
// Get the queue URL from the execution environment
const queueUrl = process.env.SQSQueueUrl
// Create a SendMessageCommand containing the message
// and the queue URL
const command = new sqs.SendMessageCommand({
MessageBody: JSON.stringify({ message: myMessage }),
QueueUrl: queueUrl
});
// Send the message and return the result 
const result = await client.send(command);
return result;
}

这是一个简单的 Lambda 函数处理程序,它会向我们的 SQS 队列发送消息并返回结果。.

请注意,由于我们在构建此函数时不一定知道要使用哪个队列,或者我们可能以后想要更改队列而无需修改函数代码,因此我们会从运行时获取队列 URL。这样,我们就可以在将 Lambda 函数部署到 CDK 堆栈期间动态定义它。.

3.2 – 将生产者函数添加到堆栈中

要在 AWS 中创建真正的 Lambda 函数,需要将其添加到堆栈中。为此,请打开 lib/point-to-point-example-stack.ts 文件。.

首先,在文件顶部添加另一个导入语句:

import * as lambda from 'aws-cdk-lib/aws-lambda';

然后在构造函数中,队列下方添加以下代码片段:

const producerFunction = new lambda.Function(this, 'Producer', {
functionName: 'Producer',
runtime: lambda.Runtime.NODEJS_18_X,
code: lambda.Code.fromAsset('lambda-src/producer'),
handler: 'send_message_to_sqs.handler',
environment: {
SQSQueueUrl: queue.queueUrl,
}
});

这将创建一个函数、包以及 lambda-src/producer 目录的内容,并将该函数配置为在调用时使用 send_message_to_sqs.js 中定义的处理程序。.

它还会动态地将 SQSQueueUrl 环境变量设置为 SQS 队列的实际 URL,以便可以使用 process.env.SQSQueueUrl 从函数内部访问它。.

3.3 – 制作人表演的许可

每个 Lambda 函数都需要一些基本的 IAM 权限,CDK 默认使用基本执行角色提供这些权限。.

然而,我们的生产者函数需要额外的权限才能向队列发送消息。幸运的是,在 CDK 中,我们无需手动管理 IAM 策略。我们可以将 SQS 队列的 grantSendMessages 方法与我们的 Lambda 函数配合使用。.

为此,请在 producerFunction 下方添加以下代码行:

queue.grantSendMessages(producerFunction);
3.4 – 部署生产者函数

要部署该函数及其源代码,请在项目目录中运行以下命令:

cdk deploy

这样就为部署任何新的或修改过的资源做好了准备。.

如果您的堆栈发生任何更改需要创建或修改 IAM 权限,CDK 将提示您出于安全原因重新审核这些更改。.

下图显示,CDK 希望进行三项影响 IAM 权限的更改:

  • 允许 Lambda 服务 (lambda.amazon.aws) 承担附加到 Producer 函数的执行角色。.
  • 直接添加必要的权限,允许执行角色访问 SQS 队列并向其发送消息。.
  • 向执行角色添加具有初始权限的托管策略。.


输入 y 进行确认,CDK 将部署新的 Lambda 函数及其 IAM 角色。.

3.5 – 测试生产者的表现

运行函数后,转到 AWS Lambda 控制面板中的函数列表。.

函数名与我们在堆栈中定义的名称相同:Producer。如果找不到该函数,请确保您位于正确的区域。.

点击函数名称即可打开其详细信息并进入源代码部分。.

左侧的文件资源管理器文件夹中包含一个名为 send_message_to_sqs.js 的文件。.

双击文件名打开其源代码,可以看到它是我们在 CDK 项目的 lambda-src/producer 文件夹中创建的文件的精确副本。.


现在我们来配置一个测试事件。.

为此,请点击“测试”按钮。如果您尚未配置测试事件,这将打开“测试配置”页面。输入事件名称,例如“test-event”,然后点击“保存”。.

测试事件配置完成后,再次单击“测试”,这将直接调用 Lambda 函数。.

这将创建一个包含响应的“执行结果”选项卡:

{
"$metadata": {
"httpStatusCode": 200,
"requestId": "cb033b2a-1234-5678-9012-66188359d280",
"attempts": 1,
"totalRetryDelay": 0
},
"MD5OfMessageBody": "b5357af0c1c816b2d41275537cc0d1af",
"MessageId": "4a76e538-1234-5678-9012-749f0f4b9294"
}

还记得 Lambda 函数中的那段代码吗?

const result = await client.send(command);
return result;

上述执行结果中的 JSON 对象正是 SQS 对 client.send(command) 返回的响应,其中包含诸如“httpStatusCode”: 200 之类的有趣信息,这意味着队列已成功接收消息。.

3.6 – 手动从 SQS 检索消息

目前队列尚未连接到消费者,因此消息不会被接收。但是,我们可以手动检索该消息。.

进入 SQS 控制面板,点击队列打开其详细信息页面,然后点击“发送和接收消息”按钮。.

在下一页中,转到“接收消息”部分,然后单击“轮询消息”按钮,该按钮应显示当前正在发送的所有消息,包括您在测试构建器函数时发送的消息:


点击邮件 ID 即可打开邮件。邮件正文应包含以下内容:

{"message":"Hello World from Producer"}

恭喜!您已成功部署并测试了一个向 SQS 队列发送消息的 Lambda 函数。.

接下来,我们创建消费者函数,并将其配置为每当队列中到达新消息时自动调用。.

步骤 4 – 创建消费者函数并将其连接到队列

4.1 – 编写消费者函数的源代码

消费者函数代码存储在 lambda-src 内部的单独文件夹中,以便 CDK 可以自动将其打包并上传到新的 Lambda 函数中。.

请确保您位于项目目录(而不是 bin/ 或 lib/ 目录)中,并使用源文件创建以下目录:

mkdir lambda-src/consumer
touch lambda-src/consumer/receive_message_from_sqs.js

在编辑器中打开 lambda-src/consumer/receive_message_from_sqs.js 文件,并输入以下代码:

exports.handler = async (event) => {
// Retrieve the message from the event and log it to the console
const message = event.Records[0].body;
console.log(message);
}

这是一个非常简单的 Lambda 函数处理程序,它打开 SQS 消息并将其打印到报表中。.

4.2 – 将 Consumer 函数添加到堆栈中

要在 AWS 中创建真正的 Lambda 函数,需要将其添加到堆栈中。为此,请打开 lib/point-to-point-example-stack.ts 文件。.

在生产者函数中,在 queue.grantSendMessages(producerFunction) 下方添加以下代码片段:

 const consumerFunction = new lambda.Function(this, 'Consumer', {
functionName: 'Consumer',
runtime: lambda.Runtime.NODEJS_18_X,
code: lambda.Code.fromAsset('lambda-src/consumer'),
handler: 'receive_message_from_sqs.handler'
});

此消费者函数创建 lambda-src/consumer 目录的包和内容,并配置该函数在调用时使用 receive_message_from_sqs.js 中定义的处理程序。.

4.3 – 将 SQS 队列作为事件源添加到消费者中

我们希望当队列中有新消息时触发此函数。为此,我们需要将队列添加为该函数的事件源,并授予该函数消费消息所需的所有权限。.

首先,在文件顶部添加另一个导入语句:

import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

接下来,在 customersFunction 定义下方添加以下两行:

consumerFunction.addEventSource(new SqsEventSource(queue));
queue.grantConsumeMessages(consumerFunction);
4.4 – 部署消费者功能

要部署该函数及其源代码,请在项目目录中运行以下命令:

cdk deploy

这样就为部署任何新的或修改过的资源做好了准备,并且再次提示您重新验证新的 IAM 权限。.

输入 y 进行确认,CDK 将执行新的 Lambda 函数及其执行角色,并将其附加到 SQS 队列。.

部署完成后,一切都将准备就绪并连接完毕。让我们看看它是否有效。

步骤 5 – 应用程序的端到端测试

为了测试应用程序,我们手动调用生产者函数向队列发送另一条消息。然后,我们检查消费者函数的执行日志,看看它是否自动启动并处理了该消息。.

5.1 – 调用生产者函数

按照 3.5 节“测试生产者函数”中提到的步骤再次调用生产者函数。以下是简要概述:

  • 转到 AWS Lambda 控制面板中的函数列表。.
  • 点击名称打开生产者函数。.
  • 转到源代码部分。.
  • 点击“测试”按钮。.
  • 确保函数运行成功。.
5.2 – 查看消费者绩效报告

返回 AWS Lambda 控制面板中的函数列表,这次打开 Consumer 函数。.

向下滚动,打开名为“监视器”的选项卡,在该选项卡中,打开“日志”选项卡,查看“最近调用”,您应该可以在那里找到指向该函数的最后一个 LogStream 的链接。.


之后,点击“日志流”链接。这将显示日志条目本身,如果您不习惯这种格式,可能会觉得有点令人困惑。.

只需查找位于这两个条目中间的条目,该条目以 START RequestId: … 和 END Request ID: … 开头。.

点击箭头展开条目,即可找到原始消息:

结果

恭喜!您已经创建了使用 SQS 在两个 Lambda 函数之间设置异步点对点消息传递所需的一切。.

SQS 还有更多功能,例如消息分类、确保消息顺序以及自动将错误消息发送到死信队列 (DLQ)。如需了解更多信息,请参阅 Amazon SQS 开发人员指南。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

您可能也喜欢