如何使用 KCraft 设置多节点 Kafka 集群

0 股票
0
0
0
0

介绍

Apache Kafka 是一个开源的分布式事件和流处理平台,用 Java 编写,专为处理实时数据流而设计。它具有极佳的可扩展性、高吞吐量和高可用性,每个集群可支持数百个节点。.

在本教程中,您将创建一个使用 KRaft 共识协议的 Kafka 集群。您将学习如何配置节点以组成集群,以及如何将主题分区分配给不同的节点。您还将学习如何将主题分配给集群中的特定代理。.

先决条件
  • 三台至少配备 4GB 内存和 2 个 CPU 的服务器
  • 一个已完全注册的域名,包含三个子域名,分别指向三个不同的域名服务器。.
  • Apache Kafka 已安装并配置在您的 Droplets 上。

第一步——配置 Kafka 节点

在此步骤中,您将配置之前创建的三个 Kafka 服务器,使它们成为同一个 KRaft 集群的一部分。借助 KRaft,节点本身可以组织和执行管理任务,而无需额外的开销和对 Apache ZooKeeper 的依赖。.

第一个节点配置

首先,您需要配置第一个节点。首先,运行以下命令停止第一个 Droplet 上的服务:

sudo systemctl stop kafka

以 Kafka 用户身份,导航到 Kafka 所在的目录,然后运行以下命令打开其配置文件进行编辑:

vi /config/kraft/server.properties

找出下列各行:

...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

这三个参数将 Kafka 节点配置为同时充当代理和控制器,这意味着它既接收和消费数据(代理),也执行管理任务(控制器)。这种分离在大规模部署中非常有用,因为控制器可以独立运行,从而提高效率和冗余性。.

node.id 指定集群中的节点 ID。这是第一个节点,因此其 ID 应保持为 1。所有节点都必须具有唯一的节点 ID,因此第二个和第三个节点的 ID 将分别为 2 和 3。.

controller.quorum.voters 将节点 ID 映射到其对应的地址和端口以进行通信。您需要在此处指定所有集群节点的地址,以便每个节点都能感知到其他节点。将该行修改为如下所示:

...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...

在此处列出集群中的所有三个节点及其各自的 ID。请记住将 your_domain 替换为您在准备工作中设置的域名地址。.

接下来,在文件中找到以下几行:

...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...

listeners 定义了 Kafka 节点监听的地址,而 advertised.listeners 则指定发送给客户端用于连接节点的地址。这样,您可以指定客户端应使用的实际地址的子集。.

将以下几行代码修改为如下所示,并将 your_domain 替换为您的实际域名:

...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...

由于此节点将位于集群中,因此您已明确将地址指向它将运行的 droplet。.

然后,找到 num.partitions 设置:

...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...

正如注释中所述,这是为每个新线程配置的默认分区数。由于您有三个节点,请将其设置为 2 的倍数:

...
num.partitions=6
...

这里的值 6 确保每个节点默认维护两个主题分区。.

接下来,您需要配置内部线程的迭代因子,这些线程负责维护消费者偏移量和事务状态。请找到以下几行代码:

...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

将它们设置为以下值:

...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

在这里,您需要指定至少两个节点的内部元数据必须同步。完成后,保存并关闭文件。.

设置默认分区号后,需要重新配置日志存储。首先,运行以下命令删除现有日志文件:

rm -rf /home/kafka/kafka-logs/*

接下来,创建一个新的集群 ID 并将其存储在环境变量中:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

在终端中显示:

echo $KAFKA_CLUSTER_ID

输出 ID 为:

OutputMjj4bch9Q3-B0TEXv8_zPg

记下这个值;配置第二个和第三个节点时需要用到它。.

最后,运行以下命令创建报表存储库:

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

输出结果将类似于这样:

Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
配置第二个和第三个节点

配置其他节点与配置第一个节点非常相似。请注意,您还需要更新 node.id:

...
node.id=node_number
...

第二个和第三个节点的适当值分别为 2 和 3,设置监听器和广播监听器的适当地址。.

重建日志存储时,重用第一个节点的集群 ID:

KAFKA_CLUSTER_ID="your_cluster_id"

完成后,在所有三个节点上运行以下命令启动 Kafka 服务:

sudo systemctl start kafka

至此,您已配置了三个 Kafka 节点,使其成为 KRaft 集群的一部分。您创建了一个主题,并在集群中生成和消费消息。.

步骤 2 – 连接到集群

在此步骤中,您将使用 Kafka 自带的 shell 脚本连接到 Kafka 集群。您还将创建一个主题,并尝试从集群中生成和消费数据。然后,您将关闭其中一个节点,并观察 Kafka 如何减少数据丢失。.

Kafka 提供了一个名为 kafka-metadata-quorum.sh 的脚本,用于显示集群及其成员的相关信息。运行以下命令即可运行该脚本:

./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

您将连接到端口 9093 上的某个节点,该端口是控制器(而非代理)的端点。请记住将 kafka1.your_domain 替换为指向您的某个 Kafka 节点的域名。.

输出结果将类似于这样:

OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []

该脚本列出了集群状态的初始信息。从显示的输出可以看出,节点 3 被选为领导者,并且所有三个节点 ([1,2,3]) 都在投票集中,并且都同意这一决定。.

创建一个名为“First Topic”的主题,运行以下命令:

./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

输出结果为:

Created topic first-topic.

然后运行 kafka-topics.sh 脚本,查看节点上的分区是如何排列的:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

将重复因子设置为 2 可确保该主题至少在两个节点上可用。.

输出结果将类似于这样:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

您可以看到,每个分区都有一个主节点、两个副本节点和两个同步副本集 (ISR)。分区主节点是一个服务器节点,负责向客户端提供分区数据,而副本节点仅保存数据副本。默认情况下,如果一个副本节点在过去十秒内与主节点保持同步,则该副本节点被视为 ISR。此时间间隔可以针对每个主题进行配置。.

现在您已经创建了一个主题,接下来将使用 kafka-console-producer.sh 脚本来生成其消息。运行以下命令启动生产者:

./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

您会看到一条空白通知:

>

服务提供商正在等待您的短信。请输入测试内容并按回车键。通知将显示如下:

>Hello World!
>

生产者现在正在等待下一条消息,这意味着上一条消息已成功发送到 Kafka。您可以输入任意数量的消息进行测试。要退出生产者,请按 CTRL+C。.

你需要一个消费者来重新读取主题消息。Kafka 提供了一个简单的消费者 kafka-console-consumer.sh。运行以下命令即可启动它:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

您将看到正在读取的主题消息:

OutputHello World!
...

节点故障模拟

在第三个 Kafka 节点上,运行以下命令停止服务:

sudo systemctl stop kafka

然后,通过实践讲解这个主题:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

输出结果将类似于这样:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

尽管节点 3 被列为副本,但由于其不可用,因此不在 ISR 集合中。当它重新加入集群时,它会与其他节点同步并尝试恢复其先前的位置。.

请再次阅读第一个主题中的消息:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

你会发现它们和往常一样可以访问:

OutputHello World!
...

由于存在副本,前两个节点接管控制权并为消费者提供服务。现在您可以在第三个服务器上启动 Kafka:

sudo systemctl start kafka

至此,您已经了解了 Kafka 如何缓解集群中节点不可用的情况。现在您将学习如何从集群中移除节点。.

步骤 3 – 在节点之间传输数据

在本步骤中,您将学习如何在 Kafka 集群中的节点之间移动主题。向现有集群添加节点时,Kafka 不会自动将任何分区移动到该节点,而您可能需要这样做。这对于删除节点也很有用,因为现有分区不会自动移动到剩余节点。.

Kafka 提供了一个名为 kafka-reassign-partitions.sh 的脚本,可以生成、执行和验证迁移计划。您将使用它来创建一个计划,将第一个主题的分区迁移到前两个节点。.

首先,您需要指定要移动哪些主题。该脚本接受一个包含主题定义的 JSON 文件,因此请创建并打开该文件进行编辑:

vi topics-to-move.json

添加以下几行:

{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}

在“标题”下,定义一个引用第一个主题的对象。完成后,保存并关闭文件。.

运行以下命令生成迁移计划,将 kafka1.your_domain 替换为指向您的 Kafka 节点之一的域名:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

您将“1,2”传递给--broker-list,它代表目标经纪商的ID。.

输出结果将类似于这样:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

此脚本总共创建了两个方案,分别描述了当前分区布局和建议的分区布局。第一个方案用于日后需要撤销更改的情况。请注意第二个方案,您需要将其保存到名为 migration-plan.json 的单独文件中。创建并打开该文件进行编辑:

vi migration-plan.json

添加第二个可执行文件:

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

保存并关闭文件。然后运行以下命令来运行它:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

输出结果为:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

脚本显示迁移已开始。要查看迁移进度,请输入 --verify 参数:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

过一段时间后,输出结果将如下所示:

OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic

现在你可以解释第一个问题,即检查 Broker 3 上是否存在分区:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

输出结果如下:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

只有代理 1 和 2 作为副本和 ISR 存在,这意味着迁移成功了。.

至此,您已创建了一个迁移计划,将第一个主题从代理 3 迁移到其余主题,并学习了如何验证迁移是否顺利进行。.

结果

现在您拥有一个由三个节点组成的 Kafka 集群,这些节点使用 KRaft 协议进行通信。您还学习了如何检查集群和分区布局。您通过关闭一个节点并从主题中读取数据来测试集群冗余。最后,您学习了如何将主题切换到集群节点。.

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

您可能也喜欢