KCraftを使用してマルチノードKafkaクラスターを設定する方法

0 株式
0
0
0
0

導入

Apache Kafka は、Java で記述されたオープンソースの分散イベントおよびストリーム処理プラットフォームであり、リアルタイムデータフィードの処理を目的として構築されています。高いスループットと可用性を備え、本質的にスケーラブルです。クラスターあたり数百ノードをサポートするように設計されています。.

このチュートリアルでは、KRaftコンセンサスプロトコルを使用するKafkaクラスターを作成します。クラスターを構成するノードの設定方法と、トピックパーティションが各ノードにどのように割り当てられるかを確認します。また、クラスター内の特定のブローカーにトピックを割り当てる方法も学習します。.

前提条件
  • 少なくとも 4 GB の RAM と 2 つの CPU を備えた 3 つのドロップ
  • 3 つのドロップを指す 3 つのサブドメインを持つ、完全に登録されたドメイン名。.
  • Apache Kafka が Droplet にインストールされ、構成されています。

ステップ1 – Kafkaノードを構成する

このステップでは、前提条件として作成した3つのKafkaサーバーを同じKRaftクラスターの一部となるように設定します。KRaftを使用すると、Apache ZooKeeperへの追加のオーバーヘッドや依存なしに、ノード自体が管理タスクを整理して実行できます。.

最初のノードの構成

まず最初のノードの設定から始めます。まず、次のコマンドを実行して、最初のドロップレットのサービスを停止します。

sudo systemctl stop kafka

Kafka ユーザーとして、Kafka が存在するディレクトリに移動し、次のコマンドを実行して編集用に構成ファイルを開きます。

vi /config/kraft/server.properties

次の行を見つけます。

...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

これら3つのパラメータは、Kafkaノードをブローカーとコントローラーの両方の役割を果たすように設定します。つまり、データの受信と利用(ブローカー)と管理タスクの実行(コントローラー)を行います。この分離は、コントローラーを分離して効率性と冗長性を高めることができる大規模なデプロイメントで役立ちます。.

node.id はクラスター内のノードIDを指定します。これは最初のノードなので、1のままにしておきます。すべてのノードは一意のノードIDを持つ必要があるため、2番目と3番目のノードのIDはそれぞれ2と3になります。.

controller.quorum.voters は、ノードIDを対応するアドレスとポートにマッピングし、通信に利用します。ここで、各ノードが他のノードを認識できるように、すべてのクラスターノードのアドレスを指定します。この行を次のように変更します。

...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...

ここでは、クラスター内の3つのノードすべてをそれぞれのIDとともにリストします。your_domain を、前提条件の設定で設定したドメインアドレスに置き換えてください。.

次に、ファイル内で次の行を見つけます。

...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...

listeners は Kafka ノードがリッスンするアドレスを定義し、advertised.listeners はノードに接続するためにクライアントに送信されるアドレスを指定します。これにより、クライアントが実際に使用するアドレスのサブセットを指定できます。.

次のように行を変更します。your_domain を実際のドメイン名に置き換えます。

...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...

このノードはクラスター内にあるため、ノードが実行されるドロップレットのアドレスを明示的に指定しています。.

次に、num.partitions 設定を見つけます。

...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...

コメントにあるように、これは新しいスレッドごとに設定されるデフォルトのパーティション数です。ノードが3つあるので、2の倍数に設定してください。

...
num.partitions=6
...

ここでの 6 という値は、各ノードがデフォルトで 2 つのトピック パーティションを維持することを保証します。.

次に、コンシューマーのオフセットとトランザクションステータスを管理する内部スレッドの反復係数を設定します。次の行を見つけます。

...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

次の値に設定します。

...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

ここでは、内部メタデータに関して少なくとも2つのノードが同期している必要があることを指定します。完了したら、ファイルを保存して閉じます。.

デフォルトのパーティション番号を設定したら、ログストレージを再設定する必要があります。まず、次のコマンドを実行して既存のログファイルを削除します。

rm -rf /home/kafka/kafka-logs/*

次に、新しいクラスター ID を作成し、それを環境変数に保存します。

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

ターミナルに表示します:

echo $KAFKA_CLUSTER_ID

出力 ID は次のようになります。

OutputMjj4bch9Q3-B0TEXv8_zPg

この値をメモしておいてください。2 番目と 3 番目のノードを構成するときに必要になります。.

最後に、次のコマンドを実行してレポート リポジトリを作成します。

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

出力は次のようになります。

Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
2番目と3番目のノードの構成

他のノードの設定は、最初のノードとほぼ同じです。node.idも更新する必要があることに注意してください。

...
node.id=node_number
...

2 番目と 3 番目のノードの適切な値はそれぞれ 2 と 3 であり、リスナーと advertised.listeners に適切なアドレスを設定します。.

ログ ストレージを再構築するときは、最初のノードのクラスター ID を再利用します。

KAFKA_CLUSTER_ID="your_cluster_id"

完了したら、次のコマンドを実行して、3 つのノードすべてで Kafka サービスを開始します。

sudo systemctl start kafka

ここまでで、3つのKafkaノードをKRaftクラスターの一部として構成できました。トピックを作成し、クラスター内でメッセージを生成および消費します。.

ステップ2 – クラスターへの接続

このステップでは、Kafkaに付属のシェルスクリプトを使用してKafkaクラスターに接続します。また、トピックを作成し、クラスターからデータを生成・消費してみます。その後、ノードの1つを停止し、Kafkaがデータ損失をどのように削減するかを確認します。.

Kafka には、クラスターとそのメンバーに関する情報を表示する kafka-metadata-quorum.sh というスクリプトが用意されています。実行するには、以下のコマンドを実行してください。

./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

コントローラーのエンドポイント(ブローカーのエンドポイントではありません)であるポート9093で、いずれかのノードに接続します。kafka1.your_domainを、Kafkaノードのドメインに置き換えてください。.

出力は次のようになります。

OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []

スクリプトはクラスタ状態の初期情報をリストします。出力を見ると、ノード3がリーダーに選出され、3つのノードすべて([1,2,3])が投票セットに含まれており、その決定に同意していることがわかります。.

次のコマンドを実行して、「First Topic」というトピックを作成します。

./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

出力は次のようになります。

Created topic first-topic.

次に、kafka-topics.sh スクリプトを実行して、ノード上でパーティションがどのように配置されているかを確認します。

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

繰り返し係数を 2 に設定すると、トピックが少なくとも 2 つのノードで使用できるようになります。.

出力は次のようになります。

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

各パーティションには、リーダーノード、レプリカノード、そして同期中のレプリカセット(ISR)が2つずつあることがわかります。パーティションリーダーは、パーティションのデータをクライアントに提供するサーバーノードであり、レプリカノードはコピーのみを保持します。レプリカノードは、デフォルトでは過去10秒間リーダーノードと同期している場合、ISRとみなされます。この間隔はトピックごとに設定可能です。.

トピックを作成したので、kafka-console-producer.sh スクリプトを使用してメッセージを生成します。以下のコマンドを実行してプロデューサーを起動します。

./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

空白の通知が表示されます。

>

プロバイダーはあなたのSMSを待っています。テストを入力してEnterキーを押してください。通知は次のようになります。

>Hello World!
>

プロデューサーは現在、次のメッセージを待機しています。これは、前のメッセージがKafkaに正常に配信されたことを意味します。テスト用に任意の数のメッセージを入力できます。プロデューサーを終了するには、Ctrl+Cを押してください。.

トピックメッセージを再読み込みするには、コンシューマーが必要です。Kafka は kafka-console-consumer.sh というシンプルなコンシューマーを提供しています。以下のコマンドを実行して実行してください。

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

トピックから読み取られるメッセージが表示されます。

OutputHello World!
...

ノード障害シミュレーション

3 番目の Kafka ノードで、次のコマンドを実行してサービスを停止します。

sudo systemctl stop kafka

次に、練習しながらトピックを説明します。

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

出力は次のようになります。

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

ノード3はレプリカとしてリストされていますが、利用できないためISRセットには含まれていません。ノード3がクラスターに再参加すると、他のノードと同期し、以前の状態に戻ろうとします。.

最初のトピックのメッセージをもう一度読んでみてください。

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

通常どおりアクセスできることがわかります。

OutputHello World!
...

レプリカの存在により、最初の2つのノードが制御権を取得し、コンシューマーにサービスを提供します。これで、3つ目のサーバーでKafkaを起動できます。

sudo systemctl start kafka

ここまで、Kafka がクラスター内のノードの利用不可状態をどのように緩和するかを見てきました。次は、クラスターからノードを削除する方法を学びます。.

ステップ3 – ノード間でデータを転送する

このステップでは、Kafka クラスター内のノード間でトピックを移動する方法を学習します。トピックを含む既存のクラスターにノードを追加する場合、Kafka はパーティションを自動的に移動しませんが、これはユーザーが望むような移動ではありません。これは、既存のパーティションが残りのノードに自動的に移動されないため、ノードを削除する場合にも役立ちます。.

Kafka には、移行計画を生成、実行、検証できる kafka-reassign-partitions.sh というスクリプトが用意されています。このスクリプトを使用して、最初のトピックのパーティションを最初の 2 つのノードに移行する計画を作成します。.

まず、移動するトピックを指定する必要があります。スクリプトはトピック定義を含むJSONファイルを受け付けるので、ファイルを作成して開き、編集します。

vi topics-to-move.json

次の行を追加します。

{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}

「タイトル」の下に、最初のトピックを参照するオブジェクトを定義します。完了したら、ファイルを保存して閉じます。.

次のコマンドを実行して移行計画を生成します。kafka1.your_domain を、Kafka ノードの 1 つを指すドメインに置き換えます。

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

対象ブローカーの ID を表す –broker-list に「1,2」を渡します。.

出力は次のようになります。

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

このスクリプトは、現在のパーティションレイアウトと提案されたパーティションレイアウトをそれぞれ記述した合計2つのプランを作成しました。最初のプランは、後で変更を元に戻したい場合に備えて用意されています。2つ目のプランに注目してください。これは、migration-plan.jsonという別のファイルに保存します。このファイルを作成し、編集用に開きます。

vi migration-plan.json

2 番目の実行可能ファイルを追加します。

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

ファイルを保存して閉じます。その後、次のコマンドを実行して実行します。

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

出力は次のようになります。

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

スクリプトは移行が開始されたことを示しました。移行の進行状況を確認するには、代わりに --verify と入力してください。

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

しばらくすると、出力は次のようになります。

OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic

ここで、最初の問題を説明し、ブローカー 3 にパーティションがないことを確認します。

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

出力は次のようになります。

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

ブローカー 1 と 2 のみがレプリカおよび ISR として存在し、移行が成功したことを意味します。.

この時点で、最初のトピックをブローカー 3 から残りのトピックに移動する移行計画を作成し、移行がスムーズに行われたことを確認する方法を学習しました。.

結果

KRaftプロトコルを使用して通信する3つのノードで構成されるKafkaクラスターを作成しました。また、クラスターとパーティションレイアウトを検査する方法も学習しました。ノードを1つ停止してトピックから読み取りを行うことで、クラスターの冗長性をテストしました。最後に、トピックをクラスターノードに切り替える方法を学習しました。.

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

あなたも気に入るかもしれない