Kafka入門

0 株式
0
0
0
0

導入

Apache Kafka は、Java で記述されたオープンソースの分散イベントおよびストリーム処理プラットフォームで、リアルタイムデータフィードの処理を目的として構築されています。高いスケーラビリティ、高いスループット、そして高い可用性を備えています。Apache Software Foundation によって開発された Kafka は、その信頼性、使いやすさ、そしてフォールトトレランス性から広く採用されています。世界最大級の組織において、大量のデータを分散的かつ効率的に管理するために利用されています。.

このチュートリアルでは、Apache Kafkaをダウンロードしてセットアップします。トピックの作成と削除、そして付属のスクリプトを使ったイベントの送受信の方法を学びます。また、同じ目標を持つ類似プロジェクトや、Kafkaとの比較についても学びます。.

前提条件
  • 少なくとも4GBのRAMと2つのCPUを搭載したマシン。Ubuntu Serverの場合
  • Droplet またはローカル マシンに Java 8 以降がインストールされていること。.

ステップ1 – Apache Kafkaのダウンロードと設定

このセクションでは、Apache Kafka をマシンにダウンロードして解凍します。セキュリティを強化するため、自分のユーザーアカウントでセットアップします。その後、KRaft を使用して設定と実行を行います。.

まず、Kafka を実行するための別のユーザーを作成します。以下のコマンドを実行して、kafka という名前のユーザーを作成します。

sudo adduser kafka

アカウントのパスワードの入力を求められます。強力なパスワードを入力し、各フィールドでEnterキーを押して追加情報を入力しないでください。.

最後に、特定の Kafka ユーザーに切り替えます。

su kafka

次に、公式ダウンロードページからKafkaのリリースパッケージをダウンロードします。執筆時点での最新バージョンは3.7.0で、Scala 2.13用にビルドされています。macOSまたはLinuxをお使いの場合は、curlを使ってKafkaをダウンロードできます。.

このコマンドを使用して Kafka をダウンロードし、/tmp に配置します。

curl -o /tmp/kafka.tgz https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

バージョンはメインディレクトリの ~/kafka に保存します。以下のコマンドを実行して作成します。

mkdir ~/kafka

次に、次のコマンドを実行して ~/kafka に抽出します。

tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1

ダウンロードしたアーカイブには Kafka バージョンと同じ名前のルート フォルダーが含まれているため、–strip-components=1 はそれをスキップして、その中のすべてのものを抽出します。.

本稿執筆時点では、Kafka 3 はメタデータ管理のための 2 つのシステム、Apache ZooKeeper と Kafka KRaft(Kafka Raft の略)をサポートする最後のメジャーリリースでした。ZooKeeper は、アプリケーションの分散データを調整するための標準的な方法を提供するオープンソースプロジェクトであり、Apache Software Foundation によって開発されています。.

しかし、Kafka 3.3以降ではKRaftのサポートが導入されました。KRaftはKafkaインスタンスの連携のみを目的とした専用システムであり、インストールプロセスを簡素化し、スケーラビリティを大幅に向上させます。KRaftを使用すると、管理メタデータを外部に保持するのではなく、Kafka自体がデータに対する全責任を負います。.

ZooKeeperのサポートはKafka 4以降ではまだ利用可能ですが、削除される予定です。このチュートリアルでは、KRaftを使用してKafkaをセットアップします。.

新しいKafkaクラスターには一意の識別子を作成する必要があります。現在、クラスターは1つのノードのみで構成されています。Kafkaが現在配置されているディレクトリに移動します。

cd ~/kafka

KRaft を使用した Kafka は設定を config/kraft/server.properties に保存しますが、ZooKeeper の設定ファイルは config/server.properties です。.

初めて実行する前に、いくつかのデフォルト設定を上書きする必要があります。以下のコマンドを実行して、ファイルを編集用に開きます。

nano config/kraft/server.properties

次の行を見つけます。

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...

log.dirs 設定は、Kafka がログファイルを保存する場所を指定します。デフォルトでは、一時的ではあっても書き込み可能であることが保証されているため、/tmp/kafka-logs に保存されます。値を指定されたパスに置き換えてください。

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/kafka-logs
...

Kafka 用に別のユーザーを作成したので、ログディレクトリのパスをユーザーのホームディレクトリの下に配置します。存在しない場合は、Kafka によって作成されます。完了したら、ファイルを保存して閉じます。.

Kafka の設定が完了したら、次のコマンドを実行してランダムなクラスター ID を生成します。

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

次に、次のコマンドを実行して ID を入力し、ログ ファイル用のストレージ スペースを作成します。

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

出力は次のようになります。

Output
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.

最後に、Kafka サーバーを初めて起動できます。

bin/kafka-server-start.sh config/kraft/server.properties

最終出力は次のようになります。

Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

出力は、Kafka が KRaft を使用して正常に初期化され、0.0.0.0:9092 で接続を受け入れていることを示しています。.

CTRL + C を押すとプロセスが終了します。セッションを開いたまま Kafka を実行することは好ましくないため、次の手順で、バックグラウンドで Kafka を実行するサービスを作成します。.

ステップ2 – Kafka用のsystemdサービスを作成する

このセクションでは、Kafka を常にバックグラウンドで実行するための systemd サービスを作成します。systemd サービスは継続的に起動、停止、再起動できます。.

サービス設定は、systemdがサービスを保存する/lib/systemd/systemディレクトリ内のcode-server.serviceというファイルに保存します。テキストエディタを使ってファイルを作成してください。

sudo nano /etc/systemd/system/kafka.service

次の行を追加します。

[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

ここではまずサービスの説明を指定します。次に、[Service] フィールドでサービスの種類(simple はコマンドが単純に実行されることを意味します)を定義し、実行するコマンドを指定します。また、実行ユーザーとして kafka を指定し、Kafka が終了した場合にサービスを自動的に再起動するように指定します。.

[Install] セクションは、サーバーにログインできる状態になったときにこのサービスを開始するようシステムに指示します。完了したら、ファイルを保存して閉じてください。.

次のコマンドを実行して、Kafka サービスを開始します。

sudo systemctl start kafka

ステータスを表示して、正しく起動したことを確認します。

sudo systemctl status kafka

次のような出力が表示されます。

Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.

サーバーの再起動後に Kafka を自動的に起動するには、次のコマンドを実行してサービスを有効にします。

sudo systemctl enable kafka

ここまでで、Kafka 用の systemd サービスを作成して有効化し、サーバーの起動時に起動できるようになりました。次に、Kafka でトピックを作成および削除する方法と、利用可能なスクリプトを使用してテキストメッセージを生成および受信する方法を学習します。.

ステップ3 – 話題のメッセージの作成と消費

Kafkaサーバーのセットアップが完了したので、トピックの概要と、付属のスクリプトを使用してトピックを管理する方法を学びます。また、トピックからメッセージを送受信する方法も学習します。.

イベントストリームの記事で説明したように、メッセージの公開と受信はトピックに関連付けられています。トピックは、メッセージが属するカテゴリに関連付けることができます。.

提供されているスクリプト kafka-topics.sh を使用すると、CLI 経由で Kafka のトピックを管理できます。first-topic という名前のトピックを作成するには、次のコマンドを実行します。

bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092

提供されるすべての Kafka スクリプトでは、--bootstrap-server を使用してサーバー アドレスを指定する必要があります。.

出力は次のようになります。

Output
Created topic first-topic.

既存のトピックをすべて一覧表示するには、–create を –list に置き換えます。

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

作成したトピックが表示されます。

Output
first-topic

--describe を渡すと、トピックに関する詳細な情報と統計を取得できます。

bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092

出力は次のようになります。

Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

最初の行はトピック名、ID、および再帰係数を指定します。トピックは現在のマシンにのみ存在するため、再帰係数は1です。2行目は意図的にインデントされており、トピックの最初の(そして唯一の)パーティションに関する情報を示しています。Kafkaではトピックをパーティション分割できるため、トピックの異なる部分を複数のサーバーに分散してスケーラビリティを向上させることができます。ここでは、パーティションは1つだけです。.

トピックを作成したので、kafka-console-producer.sh スクリプトを使用してトピックにメッセージを生成します。以下のコマンドを実行してプロデューサーを起動します。

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

空白の通知が表示されます。

>

プロバイダーはあなたのSMSを待っています。テストを入力してEnterキーを押してください。通知は次のようになります。

>test
>

プロデューサーは現在、次のメッセージを待機しています。これは、前のメッセージがKafkaに正常に配信されたことを意味します。テスト用に任意の数のメッセージを入力できます。プロデューサーを終了するには、Ctrl+Cを押してください。.

トピックメッセージを取得するには、コンシューマーが必要です。Kafka は kafka-console-consumer.sh というシンプルなコンシューマーを提供しています。以下のコマンドで実行してください。

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

ただし、出力は行われません。これは、コンシューマーがトピックからデータをストリーミングしており、現時点では何も生成または送信されていないためです。コンシューマーが起動する前に生成したメッセージをコンシュームするには、次のコマンドを実行してトピックを最初から読み取る必要があります。

bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092

コンシューマーはすべてのトピック イベントを再生し、メッセージを取得します。

Outputtest
...

ビルダーと同様に、終了するには CTRL+C を押します。.

コンシューマーが実際にデータをストリーミングしていることを確認するには、別のターミナルセッションでコンシューマーを開きます。セカンダリSSHセッションを開き、コンシューマーをデフォルト設定で実行します。

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

最初のセッションで、コンストラクターを実行します。

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

次に、希望するメッセージを入力します。

>second test
>third test
>

消費者が受け取ったメッセージはすぐに確認できます。

Output
second test
third test

テストが完了したら、プロデューサーとコンシューマーの両方を終了します。.

最初のトピックを削除するには、kafka-topics.sh に --delete を渡します。

bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092

出力はありません。トピックを一覧表示して、実際に削除されたかどうかを確認できます。

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

出力は次のようになります。

Output
__consumer_offsets

__Consumer_Equivalent は、コンシューマーがトピックを読み取った時間を保存する Kafka の内部トピックです。.

ここまでで、Kafkaトピックを作成し、その中にメッセージを作成しました。その後、提供されたスクリプトを使用してメッセージをコンシュームし、最終的にリアルタイムで受信しました。次に、Kafkaを他のイベントブローカーや類似ソフトウェアと比較します。.

類似アーキテクチャとの比較

Apache Kafka は、イベントストリーミングのユースケースにおける事実上のソリューションと考えられています。しかし、Apache Pulsar と RabbitMQ も広く利用されており、アプローチは異なりますが、汎用性の高い選択肢として際立っています。.

メッセージキューとイベントストリームの主な違いは、前者の主な役割が、メッセージの順序に関わらず、可能な限り最速でコンシューマーにメッセージを配信することである点です。このようなシステムでは通常、メッセージはコンシューマーによって確認されるまでメモリに保存されます。コンシューマーは特定のカテゴリのデータに興味を示す可能性があるため、メッセージのフィルタリングとルーティングは重要な要素です。RabbitMQは、複数のコンシューマーがトピックをサブスクライブし、メッセージの複数のコピーを受信できる、従来型のメッセージングシステムの好例です。.

一方、イベントストリーミングは永続性を重視します。イベントはアーカイブされ、維持され、一度だけ処理されます。すべてのコンシューマーが同じ方法でイベントを処理するという考え方であるため、イベントを特定のコンシューマーにルーティングすることは重要ではありません。.

Apache Pulsarは、Apache Software Foundationによって開発された、イベントストリーミングをサポートするオープンソースのメッセージングシステムです。PulsarはKafkaをベースに構築されており、当初は従来のメッセージキューイングソリューションとして開発され、後にイベントストリーミング機能が追加されました。そのため、Pulsarは、別々のアプリケーションをデプロイすることなく、両方のアプローチを組み合わせたい場合に便利です。.

結果

これで、Apache Kafka がシステムサービスとして設定され、サーバーのバックグラウンドで安全に実行されるようになりました。また、コマンドラインからトピックを操作する方法や、メッセージの生成と消費の方法も学びました。しかし、Kafka の最大の魅力は、アプリケーションに統合するための多様なクライアントです。.

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

あなたも気に入るかもしれない