Kafka入門

0 株式
0
0
0
0

導入

Apache Kafka は、Java で記述されたオープンソースの分散イベントおよびストリーム処理プラットフォームで、リアルタイムデータフィードの処理を目的として構築されています。高いスケーラビリティ、高いスループット、そして高い可用性を備えています。Apache Software Foundation によって開発された Kafka は、その信頼性、使いやすさ、そしてフォールトトレランス性から広く採用されています。世界最大級の組織において、大量のデータを分散的かつ効率的に管理するために利用されています。.

このチュートリアルでは、Apache Kafkaをダウンロードしてセットアップします。トピックの作成と削除、そして付属のスクリプトを使ったイベントの送受信の方法を学びます。また、同じ目標を持つ類似プロジェクトや、Kafkaとの比較についても学びます。.

前提条件
  • 少なくとも 4 GB の RAM と 2 つの CPU を搭載したデバイス。.
  • Droplet またはローカル マシンに Java 8 以降がインストールされていること。.

ステップ1 – Apache Kafkaのダウンロードと設定

このセクションでは、Apache Kafka をマシンにダウンロードして解凍します。セキュリティを強化するため、自分のユーザーアカウントでセットアップします。その後、KRaft を使用して設定と実行を行います。.

まず、Kafkaを実行するための別のユーザーを作成します。以下のコマンドを実行すると、次のようなユーザーが作成されます。 カフカ 作成する:

sudo adduser kafka

アカウントのパスワードの入力を求められます。強力なパスワードを入力し、Enterキーを押してください。 入力 各フィールドでは、追加情報の入力を省略します。.

最後に、特定の Kafka ユーザーに切り替えます。

su kafka

次に、公式ダウンロードページからKafkaのリリースパッケージをダウンロードします。執筆時点での最新バージョンは3.6.1です。macOSまたはLinuxをご利用の場合は、curlコマンドでKafkaをダウンロードできます。.

このコマンドを使用してKafkaをダウンロードし、 /tmp 場所:

curl -o /tmp/kafka.tgz https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz

以下のバージョンがあります ~/カフカはメインディレクトリに保存されます。次のコマンドを実行して作成します。

mkdir ~/kafka

そして、それを実行すると、 ~/カフカ 抽出する:

tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1

ダウンロードしたアーカイブには Kafka バージョンと同じ名前のルート フォルダーが含まれているため、–strip-components=1 はそれをスキップして、その中のすべてのものを抽出します。.

本稿執筆時点では、Kafka 3 はメタデータ管理のための 2 つのシステム(Apache ZooKeeper と Kafka KRaft(Kafka Raft の略))をサポートする最後のメジャーリリースでした。ZooKeeper は、アプリケーションの分散データを調整するための標準的な方法を提供するオープンソースプロジェクトであり、Apache Software Foundation によって開発されています。.

しかし、Kafka 3.3以降ではKRaftのサポートが導入されました。KRaftはKafkaインスタンスの連携のみを目的とした専用システムであり、インストールプロセスを簡素化し、スケーラビリティを大幅に向上させます。KRaftを使用すると、管理メタデータを外部に保持するのではなく、Kafka自体がデータに対する全責任を負います。.

ZooKeeperのサポートはKafka 4以降ではまだ利用可能ですが、削除される予定です。このチュートリアルでは、KRaftを使用してKafkaをセットアップします。.

新しいKafkaクラスターには、一意の識別子を作成する必要があります。現時点では、クラスターは1つのノードのみで構成されます。Kafkaが現在配置されているディレクトリに移動します。

cd ~/kafka

KRaftを使用したKafkaは、 config/kraft/server.properties 保存されますが、設定ファイルは ZooKeeper 構成/サーバー.プロパティ そうです。.

初めて実行する前に、いくつかのデフォルト設定を上書きする必要があります。以下のコマンドを実行して、ファイルを編集用に開きます。

nano config/kraft/server.properties
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...

設定 ログディレクトリ Kafkaがログファイルを保存する場所を指定します。デフォルトでは、以下の場所に保存されます。 /tmp/kafka-logs 一時的ではありますが、書き込み可能であることが保証されているため、保存されます。値を指定されたパスに置き換えます。

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/kafka-logs
...

Kafka 用に別のユーザーを作成したので、ログディレクトリのパスをユーザーのホームディレクトリの下に配置します。存在しない場合は、Kafka によって作成されます。完了したら、ファイルを保存して閉じます。.

Kafka の設定が完了したら、次のコマンドを実行してランダムなクラスター ID を生成します。

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

次に、次のコマンドを実行して ID を入力し、ログ ファイル用のストレージ スペースを作成します。

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

出力は次のようになります。

Output
Formatting /home/kafka/kafka-logs with metadata.version 3.6-IV2.

最後に、Kafka サーバーを初めて起動できます。

bin/kafka-server-start.sh config/kraft/server.properties

最終出力は次のようになります。

Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.6.1 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

出力は、KafkaがKRaftを使用して正常に初期化され、接続を作成していることを示しています。 0.0.0.0:9092 承諾します。.

いつ Ctrl + C を押すと、プロセスが終了します。セッションを開いたままKafkaを実行するのは好ましくないため、次の手順でKafkaをバックグラウンドで実行するサービスを作成します。.

ステップ2 – Kafka用のsystemdサービスを作成する

このセクションでは、Kafka を常にバックグラウンドで実行するための systemd サービスを作成します。systemd サービスは継続的に起動、停止、再起動できます。.

サービス設定を次の名前のファイルに記述します。 コードサーバーサービス リスト内 /lib/systemd/システム 節約できる場所 システムド サービスを保存します。テキストエディタを使って作成してください。

sudo nano /etc/systemd/system/kafka.service

次の行を追加します。

[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

ここでまずサービスの説明を指定します。次に[サービスサービスの種類(シンプルとはコマンドが単純に実行されることを意味します)を定義し、実行するコマンドを指定します。また、実行するユーザーも指定します。 カフカ Kafka が終了した場合、サービスは自動的に再起動されるはずです。.

セクション [インストール] は、サーバーにログインできる状態になったときにこのサービスを開始するようシステムに指示します。完了したら、ファイルを保存して閉じてください。.

次のコマンドを実行して、Kafka サービスを開始します。

sudo systemctl start kafka

ステータスを表示して、正しく起動したことを確認します。

sudo systemctl status kafka

次のような出力が表示されます。

Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.

サーバーの再起動後に Kafka を自動的に起動するには、次のコマンドを実行してサービスを有効にします。

sudo systemctl enable kafka

ここまでで、Kafka 用の systemd サービスを作成して有効化し、サーバーの起動時に起動できるようになりました。次に、Kafka でトピックを作成および削除する方法と、利用可能なスクリプトを使用してテキストメッセージを生成および受信する方法を学習します。.

ステップ3 – 話題のメッセージの作成と消費

Kafkaサーバーのセットアップが完了したので、トピックの概要と、付属のスクリプトを使用してトピックを管理する方法を学びます。また、トピックからメッセージを送受信する方法も学習します。イベントストリームの記事で説明したように、メッセージのパブリッシュと受信はトピックに関連しています。トピックは、メッセージが属するカテゴリに関連付けることができます。.

提供されたスクリプトから kafka-topics.sh Kafkaのトピックは次のように管理できます。 コマンドライン トピックを作成するために使用 最初のトピック 次のコマンドを実行します。

bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092

提供されているすべてのKafkaスクリプトでは、サーバーアドレスを次のように指定する必要があります。 --bootstrap-server 特定。.

出力は次のようになります。

Output
Created topic first-topic.

利用可能なすべてのトピックを一覧表示するには、 - 作成する- リスト 送信:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

作成したトピックが表示されます。

Output
first-topic

このトピックに関する詳細な情報と統計は、以下をご覧ください。 - 説明する 得る:

bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092

出力は次のようになります。

Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

最初の行はトピック名、ID、および再帰係数を指定します。トピックは現在のマシンにのみ存在するため、再帰係数は1です。2行目は意図的にインデントされており、トピックの最初の(そして唯一の)パーティションに関する情報を示しています。Kafkaではトピックをパーティション分割できるため、トピックの異なる部分を複数のサーバーに分散してスケーラビリティを向上させることができます。ここでは、パーティションは1つだけです。.

トピックを作成したので、kafka-console-producer.sh スクリプトを使用してトピックにメッセージを生成します。以下のコマンドを実行してプロデューサーを起動します。

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

空白の通知が表示されます。

>

プロデューサーはあなたのSMSを待っています。テストに参加して 入力 を押します。通知は次のようになります。

>test
>

プロデューサーは現在、次のメッセージを待機しています。これは、前のメッセージがKafkaに正常に配信されたことを意味します。テスト用に任意の数のメッセージを入力できます。プロデューサーを終了するには、 Ctrl+C プレス 。.

トピックからメッセージを取得するには、コンシューマーが必要です。Kafkaは次のようなシンプルなコンシューマーを提供しています。 kafka-console-consumer.sh 提供しています。以下を実行して実行してください。

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

ただし、出力は行われません。これは、コンシューマーがトピックからデータをストリーミングしており、現時点では何も生成または送信されていないためです。コンシューマーが起動する前に生成したメッセージをコンシュームするには、次のコマンドを実行してトピックを最初から読み取る必要があります。

bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092

コンシューマーはすべてのトピック イベントを再生し、メッセージを取得します。

Outputtest
...

ビルダーのように、退出するには、 Ctrl+C プレス 。.

コンシューマーが実際にデータをストリーミングしていることを確認するには、別のターミナルセッションでコンシューマーを開きます。セカンダリSSHセッションを開き、コンシューマーをデフォルト設定で実行します。

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

最初のセッションで、コンストラクターを実行します。

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

次に、希望するメッセージを入力します。

>second test
>third test
>

消費者がそれを受け取ったことがすぐにわかります。

Output
second test
third test

テストが完了したら、プロデューサーとコンシューマーの両方を終了します。.

最初のトピックを削除するには、 - 消去kafka-topics.sh 移行:

bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092

出力はありません。トピックを一覧表示して、実際に削除されたかどうかを確認できます。

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

出力は次のようになります。

Output
__consumer_offsets

__Consumer_Equivalent は、コンシューマーがトピックを読み取った時間を保存する Kafka 内部トピックです。ここまでで、Kafka トピックを作成し、そこにメッセージを作成しました。次に、提供されたスクリプトを使用してメッセージをコンシュームし、最終的にリアルタイムで受信しました。次のステップでは、Kafka を他のイベントブローカーや類似ソフトウェアと比較します。.

類似アーキテクチャとの比較

Apache Kafka は、イベントストリーミングのユースケースにおける真のソリューションと考えられています。しかし、Apache Pulsar と RabbitMQ も広く利用されており、アプローチには違いはあるものの、汎用性の高い選択肢として際立っています。メッセージキューとイベントストリームの主な違いは、前者の主な役割は、メッセージの順序に関わらず、可能な限り最速でクライアントにメッセージを配信することです。このようなシステムは通常、コンシューマーによって確認されるまでメッセージをメモリに保存します。コンシューマーは特定のカテゴリのデータに関心を示す可能性があるため、メッセージのフィルタリングとルーティングは重要な要素です。RabbitMQ は、複数のコンシューマーがトピックをサブスクライブし、メッセージの複数のコピーを受信できる、従来型のメッセージングシステムの好例です。一方、イベントストリーミングは永続性に重点を置いています。イベントはアーカイブされ、整理された状態で維持され、一度だけ処理されます。すべてのコンシューマーがイベントを同じ方法で処理するという考え方であるため、特定のコンシューマーへのルーティングは重要ではありません。Apache Pulsar は、Apache Software Foundation によって開発された、イベントストリーミングをサポートするオープンソースのメッセージングシステムです。 Pulsarは最初からKafkaをベースに構築されていますが、Kafkaとは異なり、従来のメッセージキューイングソリューションとしてスタートし、後にイベントストリーミング機能を追加しました。そのため、Pulsarは、別々のアプリケーションをデプロイすることなく、両方の手法を組み合わせる必要がある場合に便利です。.

結果

これで、Apache Kafka がシステムサービスとして設定され、サーバーのバックグラウンドで安全に実行されるようになりました。また、コマンドラインからトピックを操作する方法や、メッセージの生成と消費の方法も学びました。しかし、Kafka の最大の魅力は、アプリケーションに統合するための多様なクライアントです。.

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

あなたも気に入るかもしれない