介绍
Apache Kafka 是一个开源的分布式事件和流处理平台,用 Java 编写,专为处理实时数据流而设计。它具有极佳的可扩展性、高吞吐量和高可用性。Kafka 由 Apache 软件基金会开发,因其可靠性、易用性和容错性而被广泛采用。全球最大的组织机构都在使用 Kafka 以分布式且高效的方式管理海量数据。.
在本教程中,您将下载并安装 Apache Kafka。您将学习如何创建和删除主题,以及如何使用提供的脚本发送和接收事件。您还将了解目标相同的类似项目,并比较 Kafka 与其他项目的优缺点。.
先决条件
- 至少配备 4 GB 内存和 2 个 CPU 的设备。.
- 您的 Droplet 或本地计算机上已安装 Java 8 或更高版本。.
步骤 1 – 下载并配置 Apache Kafka
在本节中,您将下载 Apache Kafka 并将其解压到您的计算机上。为了提高安全性,您将使用自己的用户帐户进行安装。然后,您将使用 KRaft 配置并运行它。.
首先,您需要创建一个单独的用户来运行 Kafka。运行以下命令,即可创建一个名为“用户”的用户。 卡夫卡 创造:
sudo adduser kafka系统会要求您输入账户密码。请输入强密码并按回车键。 进入 对于每个字段,请跳过填写其他信息。.
最后,切换到特定的 Kafka 用户:
su kafka接下来,您需要从官方下载页面下载 Kafka 发布包。截至撰写本文时,最新版本为 3.6.1。如果您使用的是 macOS 或 Linux 系统,可以使用 curl 命令下载 Kafka。.
使用此命令下载 Kafka 并将其安装到 /tmp 地方:
curl -o /tmp/kafka.tgz https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz您拥有以下版本 ~/卡夫卡你会把它保存在主目录中。运行以下命令创建它:
mkdir ~/kafka然后,通过运行它, ~/卡夫卡 提炼:
tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1由于您下载的归档文件包含一个与 Kafka 版本同名的根文件夹,因此 –strip-components=1 将跳过该文件夹并提取其中的所有内容。.
截至撰写本文时,Kafka 3 是最后一个同时支持两种元数据管理系统的主要版本:Apache ZooKeeper 和 Kafka KRaft(Kafka Raft 的简称)。ZooKeeper 是一个开源项目,它为应用程序提供了一种协调分布式数据的标准方法,同样由 Apache 软件基金会开发。.
然而,从 Kafka 3.3 开始,引入了对 KRaft 的支持。KRaft 是一个专门用于协调 Kafka 实例的系统,它简化了安装过程,并显著提高了可扩展性。借助 KRaft,Kafka 本身将承担数据的全部责任,而无需将管理元数据保存在外部。.
虽然 ZooKeeper 仍然可用,但预计 Kafka 4 及更高版本将不再支持它。在本教程中,您将使用 KRaft 设置 Kafka。.
您需要为新的 Kafka 集群创建一个唯一标识符。目前,它只包含一个节点。请转到 Kafka 当前所在的目录:
cd ~/kafkaKafka 与 KRaft 配合配置自身 config/kraft/server.properties 保存,同时配置文件 ZooKeeper 配置/服务器属性 这是。.
首次运行前,需要修改一些默认设置。运行以下命令打开文件进行编辑:
nano config/kraft/server.properties...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...设置 日志目录 指定 Kafka 保存日志文件的位置。默认情况下,它将它们存储在…… /tmp/kafka-logs 保存这些文件,因为它们保证可写,尽管是暂时的。将值替换为指定的路径:
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/kafka-logs
...由于您为 Kafka 创建了一个单独的用户,因此您需要将日志目录路径放在该用户的家目录下。如果该目录不存在,Kafka 会自动创建它。完成后,保存并关闭文件。.
Kafka 配置完成后,运行以下命令生成随机集群 ID:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"然后运行以下命令并输入 ID,为日志文件创建存储空间:
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties输出结果为:
Output
Formatting /home/kafka/kafka-logs with metadata.version 3.6-IV2.最后,您可以首次启动 Kafka 服务器:
bin/kafka-server-start.sh config/kraft/server.properties最终输出结果将类似于这样:
Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.6.1 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)输出结果显示 Kafka 已使用 KRaft 成功初始化,并且正在创建连接。 0.0.0.0:9092 接受。.
什么时候 Ctrl + C 按下此键,进程将退出。由于不建议在会话打开的情况下运行 Kafka,下一步您将创建一个服务来在后台运行 Kafka。.
步骤 2 – 为 Kafka 创建 systemd 服务
在本节中,您将创建一个 systemd 服务,使 Kafka 始终在后台运行。systemd 服务可以持续地启动、停止和重启。.
将服务配置放在一个名为 `<filename>` 的文件中。 代码服务器服务 在列表中 /lib/systemd/system 你省钱的地方 systemd 它用于存储您的服务。请使用文本编辑器创建它:
sudo nano /etc/systemd/system/kafka.service添加以下几行:
[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target首先,您需要在此处指定服务描述。然后在[服务您需要定义服务类型(简单服务表示命令将以简单方式运行),并提供要运行的命令。您还需要指定运行该命令的用户。 卡夫卡 是的,如果 Kafka 退出,服务应该自动重启。.
部分 [安装] 指示系统在可以登录服务器时启动此服务。完成后,保存并关闭文件。.
运行以下命令启动 Kafka 服务:
sudo systemctl start kafka查看程序状态,确认其是否已正确启动:
sudo systemctl status kafka您将看到类似以下内容的输出:
Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.要在服务器重启后自动启动 Kafka,请运行以下命令启用其服务:
sudo systemctl enable kafka至此,您已创建并启用了 Kafka 的 systemd 服务,使其在每次服务器启动时自动运行。接下来,您将学习如何在 Kafka 中创建和删除主题,以及如何使用现有脚本生成和消费文本消息。.
步骤三——生产和消费热点信息
现在您已经搭建好了 Kafka 服务器,接下来我们将向您介绍主题以及如何使用提供的脚本来管理主题。您还将学习如何从主题发送和接收消息。正如事件流文章中所述,消息的发布和接收都与主题相关。一个主题可以关联到一个类别,消息就属于该类别。.
根据提供的脚本 kafka-topics.sh 您可以通过以下方式在 Kafka 中管理主题 命令行界面 用于创建名为“主题” 第一主题 运行以下命令:
bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092所有提供的 Kafka 脚本都要求指定服务器地址。 --bootstrap-server 指定。.
输出结果为:
Output
Created topic first-topic.要列出所有可用主题,而不是 - 创造 在 - 列表 发送:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092您可以看到您创建的主题:
Output first-topic
您可以访问以下网址查找有关该主题的详细信息和统计数据: - 描述 得到:
bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092输出结果如下所示:
Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
第一行指定了主题名称、ID 和重复因子,重复因子为 1,因为该主题仅存在于当前机器上。第二行特意缩进,显示了该主题的第一个(也是唯一一个)分区的信息。Kafka 允许您对主题进行分区,这意味着主题的不同部分可以分布在不同的服务器上,从而提高可扩展性。这里只有一个分区。.
现在您已经创建了一个主题,接下来将使用 kafka-console-producer.sh 脚本为其生成消息。运行以下命令启动生产者:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092您会看到一条空白通知:
>生产者正在等待您的短信。请参与测试。 进入 按下按钮。通知将显示如下:
>test
>生产者现在正在等待下一条消息,这意味着上一条消息已成功发送到 Kafka。您可以输入任意数量的消息进行测试。要退出生产者, CTRL+C 按 。.
要从某个主题中检索消息,您需要一个消费者。Kafka 提供了一个简单的消费者,形式如下: kafka-console-consumer.sh 它提供了。运行它的方法如下:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092但是,不会有任何输出。这是因为消费者正在从主题中读取数据流,目前没有任何内容被生成或发送。要消费在消费者启动之前生成的消息,您需要从头开始读取主题,运行以下命令:
bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092消费者重放所有主题事件并获取消息:
Outputtest
...就像建造者一样,为了退出, CTRL+C 按 。.
要验证消费者是否确实在传输数据,请在单独的终端会话中打开它。打开一个辅助 SSH 会话,并以默认配置运行消费者:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092在初始会话中,运行构造函数:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092然后输入您想要发送的消息:
>second test
>third test
>您会立即看到它们被消费者收到了:
Output
second test
third test
测试完成后,终止生产者和消费者。.
要删除第一个主题, - 删除 到 kafka-topics.sh 转移:
bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092不会有任何输出。您可以列出主题以验证它们是否确实已被删除:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092输出结果为:
Output
__consumer_offsets
__Consumer_Equivalent 是 Kafka 内部的一个主题,用于存储消费者读取主题的次数。至此,您已创建了一个 Kafka 主题并在其中创建了消息。然后,您使用提供的脚本消费了这些消息,并最终实时接收到了它们。下一步,您将了解 Kafka 与其他事件代理和类似软件的比较。.
与类似架构的比较
Apache Kafka 被认为是事件流应用场景的理想解决方案。然而,Apache Pulsar 和 RabbitMQ 也同样被广泛使用,并以其多功能性脱颖而出,尽管它们在实现方式上有所不同。消息队列和事件流的主要区别在于,前者的主要任务是以最快的速度将消息传递给客户端,而无需考虑消息的顺序。这类系统通常会将消息存储在内存中,直到被消费者确认。消息过滤和路由是重要的方面,因为消费者可能对特定类别的数据感兴趣。RabbitMQ 是传统消息系统的典型例子,其中多个消费者可以订阅同一个主题并接收同一消息的多个副本。另一方面,事件流则侧重于持久化。事件应该被归档、以整洁的方式维护,并且只处理一次。它们被路由到特定的消费者并不重要,因为其理念是所有消费者都以相同的方式处理事件。Apache Pulsar 是由 Apache 软件基金会开发的开源消息系统,支持事件流。与从一开始就基于 Kafka 构建的 Pulsar 不同,Pulsar 最初是一个传统的消息队列解决方案,后来才添加了事件流处理功能。因此,当需要结合这两种方法,而无需部署单独的应用程序时,Pulsar 非常有用。.
结果
现在,您的服务器后台已安全运行 Apache Kafka,并将其配置为系统服务。您也学习了如何通过命令行操作主题,以及如何生成和消费消息。然而,Kafka 的主要吸引力在于其丰富的客户端,方便您将其集成到应用程序中。.









