介绍
Apache Kafka 提供 shell 脚本,用于向 Kafka 集群发送和接收基本的文本消息。虽然这些脚本对于探索和实验很有用,但实际应用通常以编程方式访问 Kafka。为此,Kafka 为常用的编程语言和环境提供了许多客户端库。在本教程中,您将创建一个 Java 应用程序,用于向 Kafka 主题发送数据。您将使用 Apache Maven(一个用于构建和打包 Java 项目的工具)创建一个 Java 项目,并将 Kafka 客户端库添加为依赖项。然后,您将实现一个使用 Kafka 客户端的类,该类能够发送消息并检索集群内消息的元数据。.
先决条件
- 至少配备 4 GB 内存和 2 个 CPU 的设备
- 您的 Droplet 或本地计算机上已安装 Java 开发工具包 (JDK) 8 或更高版本。
- Apache Kafka 已安装并配置在您的 Droplet 或本地计算机上。
- Java项目标准目录布局简介
步骤 1 – 创建 Maven 项目
在此步骤中,您将安装 Apache Maven 并使用它来创建一个与 Kafka 通信的项目。在 Ubuntu 系统上,Maven 已在官方软件仓库中提供。.
首先,运行以下命令列出可用的软件包:
sudo apt update要安装它,请运行以下命令:
sudo apt install maven通过查看版本号来确认是否已安装:
mvn --version根据您的 Java 版本和平台,输出结果将类似于以下内容:
OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"接下来,创建一个目录,用于存储您用于处理 Kafka 的 Java 项目:
mkdir ~/kafka-projects进入新创建的目录:
cd ~/kafka-projects然后运行以下命令生成一个空的 Java 项目:
mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false在这里,您指示 Maven 创建一个名为“新项目”的项目。 多卡夫卡 使用组 ID com.dokafka 群组 ID 用于在整个生态系统中唯一标识此项目。 Maven 这个项目是基于原型设计的。 maven-archetype-quickstart 将会创建这些模板,Maven 将以这种方式调用这些模板。.
会输出大量信息,尤其是在第一次运行 Maven 时。输出的结尾部分将如下所示:
Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------Maven 从其中央存储库下载必要的 Java 包并构建项目。 多卡夫卡 使用这种模式 maven-archetype-quickstart 已创建。.
运行以下命令进入项目目录:
cd dokafka项目结构如下:
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java作为先决条件的一部分,您学习了标准的 Maven 项目结构,如下所示。目录 src/main/java 包含项目源代码, src/test/java 包含实验资源和 pom.xml 项目根目录下是主要的 Maven 配置文件。.
该项目仅包含一个源文件, App.java 显示其内容,查看 Maven 生成了什么:
cat src/main/java/com/dokafka/App.java输出结果为:
package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}要运行这段代码,您必须先运行以下命令来构建项目:
mvn packageMaven 会编译代码并将其打包成 JAR 文件以供运行。输出结果的结尾会像这样,表示编译完成:
Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------Maven 文件 罐 所得结果已放入目标列表。要运行该类 应用程序 在你刚刚创建的类中,运行以下命令并输入完整的类 ID:
java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App输出结果为:
OutputHello World!您已安装 Maven 并创建了一个空的 Java 项目。接下来,您将添加 Kafka 所需的依赖项。.
步骤 2 – 添加 Maven 依赖项
现在,您需要将 Kafka Java 客户端以及其他日志记录依赖项添加到您的项目中。您还可以配置 Maven,使其在打包过程中包含这些依赖项。首先,您需要添加 Kafka 客户端依赖项。在浏览器中打开 Java 客户端的 Maven 仓库页面,选择最新版本,然后复制提供的 Maven XML 代码片段。截至撰写本文时,Java 客户端库的最新版本为 [版本号]。 3.7.0 确实如此。.
依赖关系 pom.xml 它们将被添加到项目根目录。打开它进行编辑:
nano pom.xml部分 <dependencies> 找到并添加依赖关系定义:
...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>这将为您的项目提供 Kafka 客户端库。但是,该库本身还需要两个其他依赖项,您需要手动添加。这两个依赖项来自用于记录消息的 SLF4J 库,因为它支持多种日志库,并允许开发人员灵活地处理日志消息。您需要添加的两个依赖项是:
- slf4j-api 就是这个库本身。
- slf4j-simple 处理日志并将输出输出到终端。
定义好依赖项后,需要将它们与最终构建的 JAR 文件一起提供。<构建> pom.xml 找到并添加高亮显示的行:
...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...这里是插件 maven依赖插件 您已配置为在打包时复制所有依赖项。在此项目配置中,依赖项 JAR 文件位于 目标/库 请注意,您不应该包含 & 部分。<插件> 现有的那个 <pluginManagement> 修改完成后,保存并关闭文件。.
构建项目以确保所有配置均正确:
mvn package输出结果的结尾应该如下所示:
Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------您可以从下方下载文件。 目标/库 列出所有依赖项,确保它们已被实际复制:
Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar您已将必要的依赖项添加到 Maven 项目中。现在,您希望连接到 Kafka 并按计划生成消息。.
步骤 3 – 在 Java 中创建 Kafka 生产者
在此步骤中,您将在 Java 中设置 Kafka 生产者,并将消息写入主题。.
根据项目结构,源代码如下。 src/main/java/com/dokafka 已保存。因为其余训练中不需要它。 App.java 如果没有,请运行以下命令将其删除:
rm src/main/java/com/dokafka/App.java您需要将生产者代码存储在名为 ProducerDemo 的类中。创建并打开随附的文件进行编辑:
nano src/main/java/com/dokafka/ProducerDemo.java添加以下几行:
package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}一等 制作人演示 您需要定义、导入所使用的类,并创建一个 日志记录器 在主方法中,首先声明 Kafka 集群的地址(bootstrapServers)和用于生成消息的主题名称(topicName)。.
然后,一个物体 特性 您实例化一个类似于键值字典的对象,它保存着 Kafka 生产者函数的配置。您指定该属性。 引导服务器配置 您需要填写 Kafka 集群地址。此外,还需要填写条目。 KEY_SERIALIZER_CLASS_CONFIG 和 VALUE_SERIALIZER_CLASS_CONFIG 在 StringSerializer.class.getName() 放。.
这些属性指定应使用哪些序列化器来处理生成的消息的键和值。序列化器是接受输入并返回字节数组作为输出的类,该数组可以直接通过网络传输。. 反序列化器 它们的作用恰恰相反,从字节流中重建原始对象。这里,键和值都会被使用。 字符串序列化器 内部数据被序列化为字符串。.
接下来,你将拥有一个 KafkaProducer 您声明并实例化:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);构造函数,用于构造类型的键和值 细绳 接受以下参数及其相关配置属性。要向某个主题发送消息, KafkaProducer 它接受一个以主题和消息本身命名的 ProducerRecord。 你好世界! 您正在进行采样。请注意,创作者本人并不局限于特定主题。.
发送消息后,您闪断并关闭生产者。调用 生产者.发送() 它是异步的,这意味着在消息通过另一个线程发送的同时,控制流会返回到主方法。由于此示例程序在发送消息后需要退出,因此您需要通过刷新操作强制生产者发送剩余的消息。然后,您调用 `close()` 方法,向 Kafka 发出生产者正在被销毁的信号。.
接下来,您将创建一个脚本来处理 ProducerDemo 的构建和运行。您需要将其保存为名为 run-producer.sh 的文件。创建并打开该文件进行编辑:
nano run-producer.sh添加以下几行:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo完成后,保存并关闭文件。高亮区域指示依赖项的位置。.
然后将其标记为可执行文件:
chmod +x run-producer.sh最后,尝试运行以下代码生成“Hello World!”消息:
./run-producer.sh输出结果会很长,最终应该如下所示:
Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregisteredKafkaProducer 已成功注册,但之后注册失败。现在消息已写入 java_demo 主题,您可以使用 kafka-console-consumer.sh 脚本检索该消息。.
在另一个 shell 中,进入 Kafka 安装目录,并运行以下命令来读取主题:
bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092输出结果为:
OutputHello World!您可以退出。 CTRL+C 按 。.
在此阶段,通过程序自动发送主题消息。 java_demo 您已经使用 Kafka 提供的 bash 脚本生成并读取了消息。现在您将学习如何使用 Kafka 在消息成功发送后返回的信息。.
步骤 4 – 使用回调函数检索元数据
方法 发送() KafkaProducer 它接受回调函数,允许您对发生的事件(例如收到记录时)执行操作。这对于检索有关集群如何处理记录的信息非常有用。.
扩大联系 发送() 使用回调函数,首先 制作人演示 可编辑:
nano src/main/java/com/dokafka/ProducerDemo.java将代码更改为以下内容:
...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...现在向该方法添加 Callback 接口的实现。 发送() 你转移和 onCompletion() 你正在实施这一点 记录元数据 以及可选的 例外 接收到数据后,如果发生错误,则记录该错误。否则,记录当前集群中记录的时间戳、分区号和偏移量。由于这种消息发送方式是异步的,因此当集群接受记录时,您的代码就会被调用,而无需您显式地等待集群接受记录。.
完成后,保存并关闭文件,然后运行构建器:
./run-producer.sh请注意输出末尾的新消息:
Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...新生成的消息被集群接受并存储在分区 0 中。.
如果再次运行,你会注意到偏移量增加了 1,这表示消息在分区中的位置:
Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3结果
本文中,您使用 Maven 创建了一个 Java 项目,并为其配备了与 Kafka 通信所需的依赖项。然后,您创建了一个用于向 Kafka 集群生成消息的类,并对其进行扩展以检索已发送的记录信息。.









