CLI 経由でデータソースの Kafka プロデューサーを設定する方法

0 株式
0
0
0
0

導入

Apache Kafka は、Kafka クラスターとの間で基本的なテキストメッセージの生成と使用を行うためのシェルスクリプトを提供しています。これらのスクリプトは探索や実験には便利ですが、実際のアプリケーションではプログラムによって Kafka にアクセスします。そのため、Kafka は一般的なプログラミング言語と環境向けに多くのクライアントライブラリを提供しています。このチュートリアルでは、Kafka トピックにデータを生成する Java アプリケーションを作成します。Java プロジェクトのビルドとパッケージ化ツールである Apache Maven を使用して Java プロジェクトを作成し、Kafka クライアントライブラリを依存関係として追加します。次に、Kafka クライアントを使用してメッセージを生成し、それらに関するクラスター内メタデータを取得するクラスを実装します。.

前提条件
  • 少なくとも4GBのRAMと2つのCPUを搭載したデバイス
  • Droplet またはローカルマシンに Java Development Kit (JDK) 8 以降がインストールされている
  • Droplet またはローカルマシンに Apache Kafka がインストールおよび設定されている
  • Javaプロジェクトの標準ディレクトリレイアウトの紹介

ステップ1 – Mavenプロジェクトを作成する

このステップでは、Apache Mavenをインストールし、Kafkaとの通信に使用するプロジェクトを作成します。Ubuntuでは、Mavenは公式リポジトリからすぐに利用できます。.

まず、次のコマンドを実行して利用可能なパッケージを一覧表示します。

sudo apt update

インストールするには、次のコマンドを実行します。

sudo apt install maven

バージョン番号を読んでインストールされていることを確認します。

mvn --version

出力は、Java のバージョンとプラットフォームに応じて次のようになります。

OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"

次に、Kafka を操作するための Java プロジェクトを保存するディレクトリを作成します。

mkdir ~/kafka-projects

新しく作成されたディレクトリに移動します。

cd ~/kafka-projects

次に、次のコマンドを実行して空の Java プロジェクトを生成します。

mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

ここで、Mavenに新しいプロジェクトを作成するように指示します。 ドカフカ グループID付き com.dokafka グループ ID は、エコシステム全体でこのプロジェクトを一意に識別します。 メイヴン このプロジェクトは、原型に基づいています Maven-アーキタイプ-クイックスタート が作成され、Maven はこのようにテンプレートを呼び出します。.

特にMavenを初めて実行する場合は、大量の出力が表示されます。出力の最後は以下のようになります。

Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------

Maven は中央リポジトリから必要な Java パッケージをダウンロードし、プロジェクトをビルドします。 ドカフカ パターンを使用して Maven-アーキタイプ-クイックスタート 作成しました。.

次のコマンドを実行してプロジェクト ディレクトリに移動します。

cd dokafka

プロジェクト構造は次のとおりです。

├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java

前提条件の一部として、ここで示す標準的なMavenプロジェクト構造について学習しました。ディレクトリ src/main/java プロジェクトのソースコードを保持し、 ソース/テスト/java 実験リソースと pom.xml プロジェクトのルートには、メインの Maven 構成ファイルがあります。.

このプロジェクトにはソースファイルが1つだけ含まれており、 アプリ.java その内容を表示して、Maven が何を生成したかを確認します。

cat src/main/java/com/dokafka/App.java

出力は次のようになります。

package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}

このコードを実行するには、まず次のコマンドを実行してプロジェクトをビルドする必要があります。

mvn package

Mavenはコードをコンパイルし、JARファイルにパッケージ化して実行します。出力の最後は以下のようになり、完了したことを意味します。

Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------

Mavenファイル ジャー 得られた結果はターゲットリストの下に置かれました。クラスを実行するには アプリ 作成したクラスに対して、次のコマンドを実行し、完全なクラス ID を入力します。

java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App

出力は次のようになります。

OutputHello World!

Mavenをインストールし、空のJavaプロジェクトを作成しました。次に、Kafkaに必要な依存関係を追加します。.

ステップ2 – Maven依存関係の追加

次に、Kafka Javaクライアントと、ロギングのためのその他の依存関係をプロジェクトに追加します。パッケージ化時にこれらの依存関係を含めるようにMavenを設定することもできます。まず、Kafkaクライアントの依存関係を追加します。ブラウザでJavaクライアントのMavenリポジトリページにアクセスし、利用可能な最新バージョンを選択して、提供されているMaven用のXMLスニペットをコピーします。執筆時点でのJavaクライアントライブラリの最新バージョンは 3.7.0 そうだった。.

依存関係 pom.xml プロジェクトのルートに追加されます。編集用に開きます。

nano pom.xml

セクション <dependencies&gt; を見つけて依存関係の定義を追加します。

...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>

これにより、Kafkaクライアントライブラリがプロジェクトに提供されます。ただし、ライブラリ自体には他に2つの依存関係があり、手動で追加する必要があります。これらは、メッセージのログ記録に使用されるSLF4Jライブラリから取得されます。SLF4Jは多くのログ記録ライブラリをサポートしており、開発者がログメッセージの処理方法を柔軟に決定できるようにします。追加する必要がある2つの依存関係は次のとおりです。

  • ライブラリそのものであるslf4j-api
  • ログを処理してターミナルに出力するslf4j-simple

依存関係を定義したら、ビルドされた最終JARと一緒に利用できるようにする必要があります。セクション&<ビルド> pom.xml 強調表示された行を見つけて追加します。

...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...

ここで、プラグイン Maven 依存プラグイン パッケージ作成時にすべての依存関係をコピーするように設定しています。このプロジェクト設定では、依存関係のJARファイルは以下にあります。 ターゲット/ライブラリ & 部分を含めないでください。<プラグイン> 既存の <pluginManagement&gt; 変更します。完了したら、ファイルを保存して閉じます。.

プロジェクトをビルドして、すべてが正しく構成されていることを確認します。

mvn package

出力の最後は次のようになります。

Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------

以下のファイルをダウンロードできます。 ターゲット/ライブラリ 依存関係が実際にコピーされていることを確認するためのリスト:

Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar

Mavenプロジェクトに必要な依存関係を追加しました。次に、Kafkaに接続し、スケジュールに従ってメッセージを生成します。.

ステップ3 – JavaでKafkaプロデューサーを作成する

このステップでは、Java で Kafka プロデューサーを設定し、トピックにメッセージを書き込みます。.

プロジェクト構造に従って、ソースコードは以下のようになります。 src/main/java/com/dokafka 保存されます。残りのトレーニングには必要ないので アプリ.java ない場合は、次のコマンドを実行して削除します。

rm src/main/java/com/dokafka/App.java

プロデューサーコードはProducerDemoというクラスに保存します。以下のファイルを作成して開き、編集します。

nano src/main/java/com/dokafka/ProducerDemo.java

次の行を追加します。

package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}

ファーストクラス プロデューサーデモ 使用するクラスを定義し、インポートし、 ロガー メイン メソッドでは、まず Kafka クラスターのアドレス (bootstrapServers) と、メッセージを生成するトピックの名前 (topicName) を宣言します。.

そして、物体 プロパティ をインスタンス化します。これはキーと値の辞書に似ており、Kafkaプロデューサー関数の設定を保持します。プロパティを指定します。 ブートストラップサーバー構成 Kafkaクラスタのアドレスを入力します。また、エントリは KEY_SERIALIZER_CLASS_CONFIG そして VALUE_SERIALIZER_CLASS_CONFIG の上 StringSerializer.class.getName() セット。.

これらのプロパティは、生成されたメッセージのキーと値を処理するために使用するシリアライザーを指定します。シリアライザーは、入力を受け取り、ネットワーク経由で送信可能なバイト配列を出力として返すクラスです。. デシリアライザー これらは逆のことを行い、バイトストリームから元のオブジェクトを再構築します。ここではキーと値の両方が使用されます。 文字列シリアライザー 内部は文字列としてシリアル化されます。.

次に、 Kafkaプロデューサー 以下を宣言してインスタンス化します。

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

型のキーと値のコンストラクタ 設定には以下のプロパティを使用します。トピックにメッセージを送信するには、 Kafkaプロデューサー 件名とメッセージ自体にちなんで名付けられた ProducerRecord を受け入れます。 「こんにちは世界」 サンプリングです。作成者自身は特定のトピックに縛られていないことに注意してください。.

メッセージを送信したら、プロデューサーをフラッシュして閉じます。 プロデューサー.送信() これは非同期です。つまり、メッセージが別のスレッドで送信されている間も、制御フローはメインメソッドに戻ります。このサンプルプログラムはその後終了する必要があるため、フラッシュによってプロデューサーに残っているものを送信するよう強制します。その後、プロデューサーを close() して、Kafka にプロデューサーが破棄されることを通知します。.

次に、ProducerDemoのビルドと実行を処理するスクリプトを作成します。このスクリプトはrun-producer.shというファイルに保存します。ファイルを作成し、編集用に開きます。

nano run-producer.sh

次の行を追加します。

#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo

完了したら、ファイルを保存して閉じます。ハイライト表示された領域は、依存関係が配置されている場所を示しています。.

次に、実行可能としてマークします。

chmod +x run-producer.sh

最後に、次のコマンドを実行して Hello World! メッセージを生成してみます。

./run-producer.sh

出力は長くなり、最終的には次のようになります。

Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

KafkaProducer が登録されました。これは正常に作成されましたが、その後登録に失敗しました。メッセージが java_demo トピックに書き込まれ、kafka-console-consumer.sh スクリプトを使用して取得できます。.

別のシェルで、Kafka インストール ディレクトリに移動し、次のコマンドを実行してトピックを読み取ります。

bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092

出力は次のようになります。

OutputHello World!

退出できます。 Ctrl+C プレス 。.

この段階では、プログラムによって件名にメッセージを送信します。 java_デモ Kafka が提供する bash スクリプトを使用して、メッセージを生成し、読み戻すことができました。次は、メッセージが正常に送信された後に Kafka から返される情報の使い方を学びます。.

ステップ4 – コールバックを使用してメタデータを取得する

方法 send() Kafkaプロデューサー コールバックを受け入れることで、レコードの受信時など、発生したイベントに応じてアクションを実行できます。これは、クラスターがレコードをどのように処理しているかに関する情報を取得するのに役立ちます。.

接触を拡大するには 送信() コールバックでは、まず プロデューサーデモ 編集用に開く:

nano src/main/java/com/dokafka/ProducerDemo.java

コードを次のように変更します。

...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...

次に、メソッドに Callback インターフェイスの実装を追加します。 送信() 転送して onCompletion() あなたはそれを実装しています レコードメタデータ そしてオプションで 例外 受信します。エラーが発生した場合はログに記録します。それ以外の場合は、現在クラスター内にあるレコードのタイムスタンプ、パーティション番号、オフセットを記録します。この方法でメッセージを送信するのは非同期なので、クラスターがレコードを受け入れるとコードが呼び出され、明示的に待機する必要はありません。.

完了したら、ファイルを保存して閉じ、ビルダーを実行します。

./run-producer.sh

出力の最後に新しいメッセージがあることに注意してください。

Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...

新しく生成されたメッセージはクラスターによって受け入れられ、パーティション 0 に保存されました。.

もう一度実行すると、オフセットが 1 つ大きくなり、パーティション内のメッセージの位置が示されることがわかります。

Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3

結果

この記事では、Mavenを使用してJavaプロジェクトを作成し、Kafkaと通信するための依存関係を実装しました。次に、Kafkaクラスターへのメッセージを生成するクラスを作成し、送信されたレコード情報を取得できるように拡張しました。.

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

あなたも気に入るかもしれない