卡夫卡导论

0 股票
0
0
0
0

介绍

Apache Kafka 是一个开源的分布式事件和流处理平台,用 Java 编写,专为处理实时数据流而设计。它具有极佳的可扩展性、高吞吐量和高可用性。Kafka 由 Apache 软件基金会开发,因其可靠性、易用性和容错性而被广泛采用。全球最大的组织机构都在使用 Kafka 以分布式且高效的方式管理海量数据。.

在本教程中,您将下载并安装 Apache Kafka。您将学习如何创建和删除主题,以及如何使用提供的脚本发送和接收事件。您还将了解目标相同的类似项目,并比较 Kafka 与其他项目的优缺点。.

先决条件
  • 一台至少配备 4 GB 内存和 2 个 CPU 的机器。以 Ubuntu Server 为例。
  • 您的 Droplet 或本地计算机上已安装 Java 8 或更高版本。.

步骤 1 – 下载并配置 Apache Kafka

在本节中,您将下载 Apache Kafka 并将其解压到您的计算机上。为了提高安全性,您将使用自己的用户帐户进行安装。然后,您将使用 KRaft 配置并运行它。.

首先,您需要创建一个单独的用户,Kafka 将在该用户下运行。运行以下命令创建名为 kafka 的用户:

sudo adduser kafka

系统会要求您输入账户密码。请输入强密码,并在每个字段后按回车键跳过填写其他信息。.

最后,切换到特定的 Kafka 用户:

su kafka

接下来,您需要从官方下载页面下载 Kafka 发布包。撰写本文时,最新版本为 3.7.0,基于 Scala 2.13 构建。如果您使用的是 macOS 或 Linux 系统,可以使用 curl 命令下载 Kafka。.

使用以下命令下载 Kafka 并将其放置在 /tmp 目录中:

curl -o /tmp/kafka.tgz https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

您将把版本信息存储在主目录下的 ~/kafka 目录中。运行以下命令创建该目录:

mkdir ~/kafka

然后运行以下命令将其提取到 ~/kafka:

tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1

由于您下载的归档文件包含一个与 Kafka 版本同名的根文件夹,因此 –strip-components=1 将跳过该文件夹并提取其中的所有内容。.

截至撰写本文时,Kafka 3 是最后一个同时支持两种元数据管理系统的主要版本:Apache ZooKeeper 和 Kafka KRaft(Kafka Raft 的简称)。ZooKeeper 是一个开源项目,它为应用程序提供了一种协调分布式数据的标准方法,同样由 Apache 软件基金会开发。.

然而,从 Kafka 3.3 开始,引入了对 KRaft 的支持。KRaft 是一个专门用于协调 Kafka 实例的系统,它简化了安装过程,并显著提高了可扩展性。借助 KRaft,Kafka 本身将承担数据的全部责任,而无需将管理元数据保存在外部。.

虽然 ZooKeeper 仍然可用,但预计 Kafka 4 及更高版本将不再支持它。在本教程中,您将使用 KRaft 设置 Kafka。.

您需要为新的 Kafka 集群创建一个唯一标识符。目前,它只有一个节点。请转到 Kafka 所在的目录:

cd ~/kafka

Kafka 与 KRaft 的配置存储在 config/kraft/server.properties 中,而 ZooKeeper 的配置文件是 config/server.properties。.

首次运行前,需要修改一些默认设置。运行以下命令打开文件进行编辑:

nano config/kraft/server.properties

找出下列各行:

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...

log.dirs 设置指定 Kafka 保存日志文件的位置。默认情况下,日志文件存储在 /tmp/kafka-logs 目录下,因为该目录保证可写,尽管只是临时性的。请将该值替换为指定的路径:

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/kafka-logs
...

由于您为 Kafka 创建了一个单独的用户,因此您需要将日志目录路径放在该用户的家目录下。如果该目录不存在,Kafka 会自动创建它。完成后,保存并关闭文件。.

Kafka 配置完成后,运行以下命令生成随机集群 ID:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

然后运行以下命令并输入 ID,为日志文件创建存储空间:

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

输出结果为:

Output
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.

最后,您可以首次启动 Kafka 服务器:

bin/kafka-server-start.sh config/kraft/server.properties

最终输出结果将类似于这样:

Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

输出结果显示 Kafka 已使用 KRaft 成功初始化,并且正在 0.0.0.0:9092 上接受连接。.

按下 CTRL + C 即可退出该进程。由于不建议在会话打开的情况下运行 Kafka,因此下一步将创建一个服务以在后台运行 Kafka。.

步骤 2 – 为 Kafka 创建 systemd 服务

在本节中,您将创建一个 systemd 服务,使 Kafka 始终在后台运行。systemd 服务可以持续地启动、停止和重启。.

您需要将服务配置存储在名为 code-server.service 的文件中,该文件位于 /lib/systemd/system 目录下,systemd 会将服务存储在该目录中。请使用文本编辑器创建该文件:

sudo nano /etc/systemd/system/kafka.service

添加以下几行:

[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

首先,您需要在此处指定服务描述。然后在 [Service] 字段中定义服务类型(简单表示命令将以简单方式执行),并提供要执行的命令。您还需要指定运行该服务的用户为 kafka,并且如果 Kafka 退出,则该服务应自动重启。.

[安装] 部分指示系统在您能够登录服务器时启动此服务。完成后,保存并关闭文件。.

运行以下命令启动 Kafka 服务:

sudo systemctl start kafka

查看程序状态,确认其是否已正确启动:

sudo systemctl status kafka

您将看到类似以下内容的输出:

Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.

要在服务器重启后自动启动 Kafka,请运行以下命令启用其服务:

sudo systemctl enable kafka

至此,您已创建并启用了 Kafka 的 systemd 服务,使其在每次服务器启动时自动运行。接下来,您将学习如何在 Kafka 中创建和删除主题,以及如何使用现有脚本生成和消费文本消息。.

步骤三——生产和消费热点信息

现在您已经搭建好了 Kafka 服务器,接下来我们将向您介绍主题以及如何使用提供的脚本来管理主题。您还将学习如何从主题发送和接收消息。.

如事件流文章中所述,消息的发布和接收与主题相关联。主题可以与消息所属的类别相关联。.

可以使用提供的脚本 kafka-topics.sh 通过命令行界面 (CLI) 管理 Kafka 中的主题。要创建一个名为 first-topic 的主题,请运行以下命令:

bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092

所有提供的 Kafka 脚本都需要您使用 --bootstrap-server 指定服务器地址。.

输出结果为:

Output
Created topic first-topic.

要列出所有现有主题,请将 --create 替换为 --list:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

您可以看到您创建的主题:

Output
first-topic

您可以通过传递 --describe 参数来获取有关该主题的详细信息和统计数据:

bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092

输出结果如下所示:

Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

第一行指定了主题名称、ID 和重复因子,重复因子为 1,因为该主题仅存在于当前机器上。第二行特意缩进,显示了该主题的第一个(也是唯一一个)分区的信息。Kafka 允许您对主题进行分区,这意味着主题的不同部分可以分布在不同的服务器上,从而提高可扩展性。这里只有一个分区。.

现在您已经创建了一个主题,接下来将使用 kafka-console-producer.sh 脚本为其生成消息。运行以下命令启动生产者:

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

您会看到一条空白通知:

>

服务提供商正在等待您的短信。请输入测试内容并按回车键。通知将显示如下:

>test
>

生产者现在正在等待下一条消息,这意味着上一条消息已成功发送到 Kafka。您可以输入任意数量的消息进行测试。要退出生产者,请按 CTRL+C。.

要检索主题消息,您需要一个消费者。Kafka 提供了一个简单的消费者,即 kafka-console-consumer.sh。运行以下命令即可启动它:

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

但是,不会有任何输出。这是因为消费者正在从主题中读取数据流,目前没有任何内容被生成或发送。要消费在消费者启动之前生成的消息,您需要从头开始读取主题,运行以下命令:

bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092

消费者重放所有主题事件并获取消息:

Outputtest
...

与编辑器一样,按 CTRL+C 退出。.

要验证消费者是否确实在传输数据,请在单独的终端会话中打开它。打开一个辅助 SSH 会话,并以默认配置运行消费者:

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

在初始会话中,运行构造函数:

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

然后输入您想要发送的消息:

>second test
>third test
>

您将立即看到消费者收到这些产品:

Output
second test
third test

测试完成后,终止生产者和消费者。.

要删除第一个主题,请将 --delete 参数传递给 kafka-topics.sh:

bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092

不会有任何输出。您可以列出主题以验证它们是否确实已被删除:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

输出结果为:

Output
__consumer_offsets

__Consumer_Equivalent 是 Kafka 的一个内部主题,用于存储消费者读取某个主题的时间。.

至此,您已创建了一个 Kafka 主题并在其中创建了消息。然后,您使用提供的脚本消费了这些消息,并最终实时接收到了它们。接下来,您将了解 Kafka 与其他事件代理和类似软件的比较。.

与类似架构的比较

Apache Kafka 被认为是事件流应用场景的事实标准解决方案。然而,Apache Pulsar 和 RabbitMQ 也得到了广泛应用,并以其多功能性脱颖而出,尽管它们在实现方式上有所不同。.

消息队列和事件流的主要区别在于,前者的主要任务是以最快的速度将消息传递给消费者,而无需考虑消息的顺序。这类系统通常会将消息存储在内存中,直到被消费者确认。消息过滤和路由是重要的方面,因为消费者可能对特定类别的数据感兴趣。RabbitMQ 就是一个典型的传统消息系统,其中多个消费者可以订阅同一个主题并接收同一条消息的多个副本。.

另一方面,事件流侧重于持久化。事件应该只被归档、维护和处理一次。将它们路由到特定的消费者并不重要,因为其理念是所有消费者都以相同的方式处理事件。.

Apache Pulsar 是由 Apache 软件基金会开发的开源消息系统,支持事件流处理。与从一开始就基于 Kafka 构建的 Pulsar 不同,Pulsar 最初是一个传统的消息队列解决方案,后来才具备了事件流处理能力。因此,当需要结合两种方法,而无需部署单独的应用程序时,Pulsar 非常有用。.

结果

现在,您的服务器后台已安全运行 Apache Kafka,并将其配置为系统服务。您也学习了如何通过命令行操作主题,以及如何生成和消费消息。然而,Kafka 的主要吸引力在于其丰富的客户端,方便您将其集成到应用程序中。.

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

您可能也喜欢