Wie man einen Kafka-Cluster mit mehreren Knoten mithilfe von KCraft einrichtet

0 Aktien
0
0
0
0

Einführung

Apache Kafka ist eine Open-Source-Plattform für verteilte Ereignis- und Streamverarbeitung, die in Java geschrieben und für die Verarbeitung von Echtzeit-Datenfeeds entwickelt wurde. Sie ist von Natur aus skalierbar und zeichnet sich durch hohen Durchsatz und hohe Verfügbarkeit aus. Ein Cluster kann Hunderte von Knoten unterstützen.

In diesem Tutorial erstellen Sie einen Kafka-Cluster, der das KRaft-Konsensprotokoll verwendet. Sie lernen, wie Sie Knoten für einen Cluster konfigurieren und wie Themenpartitionen verschiedenen Knoten zugewiesen werden. Außerdem erfahren Sie, wie Sie Themen bestimmten Brokern im Cluster zuordnen.

Voraussetzungen
  • Drei Geräte mit mindestens 4 GB RAM und 2 CPUs
  • Ein vollständig registrierter Domainname mit drei Subdomains, die auf die drei Drops verweisen.
  • Apache Kafka ist auf Ihren Droplets installiert und konfiguriert.

Schritt 1 – Kafka-Knoten konfigurieren

In diesem Schritt konfigurieren Sie die drei Kafka-Server, die Sie im Rahmen der Voraussetzungen erstellt haben, so, dass sie Teil desselben KRaft-Clusters sind. Mit KRaft können die Knoten ihre administrativen Aufgaben selbstständig organisieren und ausführen, ohne dass zusätzlicher Aufwand und die Abhängigkeit von Apache ZooKeeper entstehen.

Erste Knotenkonfiguration

Sie beginnen mit der Konfiguration des ersten Knotens. Stoppen Sie zunächst den Dienst auf dem ersten Droplet, indem Sie folgenden Befehl ausführen:

sudo systemctl stop kafka

Navigieren Sie als Kafka-Benutzer zu dem Verzeichnis, in dem sich Kafka befindet, und öffnen Sie dessen Konfigurationsdatei zur Bearbeitung, indem Sie Folgendes ausführen:

vi /config/kraft/server.properties

Finden Sie die folgenden Zeilen:

...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

Diese drei Parameter konfigurieren den Kafka-Knoten so, dass er sowohl als Broker als auch als Controller fungiert. Das bedeutet, er empfängt und verarbeitet Daten (Broker) und führt administrative Aufgaben aus (Controller). Diese Trennung ist in großen Installationen sinnvoll, da die Controller dort aus Effizienz- und Redundanzgründen getrennt gehalten werden können.

node.id gibt die Knoten-ID im Cluster an. Dies ist der erste Knoten, daher sollte seine ID 1 bleiben. Alle Knoten müssen eine eindeutige Knoten-ID haben, daher erhalten der zweite und dritte Knoten die IDs 2 bzw. 3.

Die Datei `controller.quorum.voters` ordnet die Knoten-IDs den entsprechenden Adressen und Ports für die Kommunikation zu. Hier geben Sie die Adressen aller Clusterknoten an, damit jeder Knoten die anderen Knoten kennt. Ändern Sie die Zeile wie folgt:

...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...

Hier listen Sie alle drei Knoten des Clusters mit ihren jeweiligen IDs auf. Ersetzen Sie dabei „your_domain“ durch Ihre Domainadresse, die Sie in den Voraussetzungen festgelegt haben.

Suchen Sie als Nächstes die folgenden Zeilen in der Datei:

...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...

`listeners` definiert die Adressen, an denen der Kafka-Knoten lauscht, während `advertised.listeners` die Adressen angibt, die an Clients gesendet werden, um eine Verbindung zum Knoten herzustellen. Dadurch können Sie eine Teilmenge der tatsächlichen Adressen festlegen, die Clients verwenden sollen.

Ändern Sie die Zeilen so, dass sie wie folgt aussehen, und ersetzen Sie dabei your_domain durch Ihren tatsächlichen Domainnamen:

...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...

Da sich dieser Knoten in einem Cluster befinden wird, haben Sie die Adressen explizit auf das Droplet verwiesen, auf dem er ausgeführt werden soll.

Suchen Sie anschließend die Einstellung num.partitions:

...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...

Wie im Kommentar erwähnt, ist dies die standardmäßige Anzahl der Partitionen, die für jeden neuen Thread konfiguriert wird. Da Sie drei Knoten haben, setzen Sie den Wert auf ein Vielfaches von zwei:

...
num.partitions=6
...

Der Wert 6 stellt hier sicher, dass jeder Knoten standardmäßig zwei Themenpartitionen verwaltet.

Als Nächstes konfigurieren Sie den Iterationsfaktor für interne Threads, die Consumer-Offsets und den Transaktionsstatus verwalten. Suchen Sie die folgenden Zeilen:

...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

Stellen Sie sie auf die folgenden Werte ein:

...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

Hier legen Sie fest, dass mindestens zwei Knoten hinsichtlich ihrer internen Metadaten synchronisiert sein müssen. Speichern und schließen Sie die Datei anschließend.

Nachdem Sie die Standardpartitionsnummer festgelegt haben, müssen Sie die Protokollspeicherung neu konfigurieren. Löschen Sie zunächst die vorhandenen Protokolldateien mit folgendem Befehl:

rm -rf /home/kafka/kafka-logs/*

Als Nächstes erstellen Sie eine neue Cluster-ID und speichern diese in einer Umgebungsvariablen:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Zeige es im Terminal an:

echo $KAFKA_CLUSTER_ID

Die Ausgabe-ID lautet:

OutputMjj4bch9Q3-B0TEXv8_zPg

Notieren Sie sich diesen Wert; Sie benötigen ihn zur Konfiguration des zweiten und dritten Knotens.

Führen Sie abschließend den folgenden Befehl aus, um das Berichtsrepository zu erstellen:

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Die Ausgabe wird in etwa so aussehen:

Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
Konfiguration des zweiten und dritten Knotens

Die Konfiguration der anderen Knoten ist der Konfiguration des ersten Knotens sehr ähnlich. Beachten Sie, dass Sie auch die node.id aktualisieren müssen:

...
node.id=node_number
...

Die entsprechenden Werte für den zweiten und dritten Knoten sind 2 bzw. 3, wodurch die entsprechenden Adressen für listeners und advertised.listeners festgelegt werden.

Beim Neuaufbau des Protokollspeichers sollte die Cluster-ID des ersten Knotens wiederverwendet werden:

KAFKA_CLUSTER_ID="your_cluster_id"

Wenn Sie fertig sind, starten Sie den Kafka-Dienst auf allen drei Knoten mit folgendem Befehl:

sudo systemctl start kafka

An diesem Punkt haben Sie drei Kafka-Knoten so konfiguriert, dass sie Teil eines KRaft-Clusters sind. Sie erstellen ein Thema und können Nachrichten in Ihrem Cluster produzieren und konsumieren.

Schritt 2 – Verbindung zum Cluster herstellen

In diesem Schritt stellen Sie mithilfe der mitgelieferten Shell-Skripte eine Verbindung zum Kafka-Cluster her. Anschließend erstellen Sie ein Topic und versuchen, Daten vom Cluster zu erzeugen und zu konsumieren. Danach schalten Sie einen der Knoten ab und beobachten, wie Kafka den Datenverlust reduziert.

Kafka stellt ein Skript namens kafka-metadata-quorum.sh bereit, das Informationen über den Cluster und seine Mitglieder anzeigt. Führen Sie folgenden Befehl aus, um es zu starten:

./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

Sie verbinden sich mit einem der Knoten an Port 9093. Dies ist der Endpunkt des Controllers (aber nicht des Brokers). Ersetzen Sie kafka1.your_domain durch die Domain, die auf einen Ihrer Kafka-Knoten verweist.

Die Ausgabe wird in etwa so aussehen:

OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []

Das Skript listet die anfänglichen Informationen zum Clusterstatus auf. In der angezeigten Ausgabe ist ersichtlich, dass Knoten 3 zum Leader gewählt wurde und alle drei Knoten ([1,2,3]) an der Abstimmung teilnehmen und dieser Entscheidung zustimmen.

Erstellen Sie ein Thema namens „Erstes Thema“, indem Sie Folgendes ausführen:

./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

Die Ausgabe lautet:

Created topic first-topic.

Führen Sie anschließend das Skript kafka-topics.sh aus, um zu sehen, wie die Partitionen auf den Knoten angeordnet sind:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Durch die Einstellung des Wiederholungsfaktors auf 2 wird sichergestellt, dass das Thema auf mindestens zwei Knoten verfügbar ist.

Die Ausgabe wird in etwa so aussehen:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Sie sehen, dass jede Partition einen Leader, zwei Replikate und zwei synchronisierte Replikatsets (ISRs) besitzt. Der Partitionsleader ist ein Serverknoten, der die Daten der Partition an Clients ausliefert, während die Replikate lediglich Kopien enthalten. Ein Replikatknoten gilt standardmäßig als ISR, wenn er in den letzten zehn Sekunden mit dem Leader synchronisiert war. Dieses Intervall ist für jedes Thema individuell konfigurierbar.

Nachdem Sie ein Thema erstellt haben, erzeugen Sie dessen Nachrichten mithilfe des Skripts kafka-console-producer.sh. Führen Sie folgenden Befehl aus, um den Producer zu starten:

./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

Sie sehen eine leere Benachrichtigung:

>

Der Anbieter wartet auf Ihre SMS. Geben Sie den Test ein und drücken Sie die Eingabetaste. Die Benachrichtigung sieht folgendermaßen aus:

>Hello World!
>

Der Producer wartet nun auf die nächste Nachricht; die vorherige Nachricht wurde also erfolgreich an Kafka übermittelt. Sie können beliebig viele Nachrichten zum Testen eingeben. Um den Producer zu beenden, drücken Sie Strg+C.

Sie benötigen einen Consumer, um die Topic-Nachrichten erneut zu lesen. Kafka stellt einen einfachen Consumer namens kafka-console-consumer.sh bereit. Führen Sie ihn mit folgendem Befehl aus:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

Sie sehen hier Nachrichten, die aus diesem Thema vorgelesen werden:

OutputHello World!
...

Knotenausfallsimulation

Auf dem dritten Kafka-Knoten stoppen Sie den Dienst durch Ausführen des folgenden Befehls:

sudo systemctl stop kafka

Erklären Sie das Thema anschließend anhand von Übungen:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Die Ausgabe wird in etwa so aussehen:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Obwohl Knoten 3 als Replikat aufgeführt ist, befindet er sich nicht in den ISR-Sätzen, da er nicht verfügbar ist. Sobald er dem Cluster wieder beitritt, synchronisiert er sich mit den anderen Knoten und versucht, seine vorherige Position wiederzuerlangen.

Versuchen Sie, die Nachrichten im ersten Thema noch einmal zu lesen:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

Sie werden sehen, dass sie wie gewohnt zugänglich sind:

OutputHello World!
...

Dank der vorhandenen Replikate übernehmen die ersten beiden Knoten die Kontrolle und bedienen den Konsumenten. Nun können Sie Kafka auf dem dritten Server starten:

sudo systemctl start kafka

Sie haben nun gesehen, wie Kafka die Nichtverfügbarkeit eines Knotens im Cluster kompensiert. Jetzt lernen Sie, wie Sie einen Knoten aus dem Cluster entfernen.

Schritt 3 – Datenübertragung zwischen den Knoten

In diesem Schritt lernen Sie, wie Sie Topics zwischen Knoten in einem Kafka-Cluster verschieben. Beim Hinzufügen eines Knotens zu einem bestehenden Cluster mit Topics verschiebt Kafka nicht automatisch Partitionen auf diesen Knoten, was unter Umständen gewünscht ist. Dies ist auch beim Entfernen von Knoten hilfreich, da vorhandene Partitionen nicht automatisch auf die verbleibenden Knoten verschoben werden.

Kafka stellt ein Skript namens kafka-reassign-partitions.sh bereit, das Migrationspläne generieren, ausführen und überprüfen kann. Sie werden es verwenden, um einen Plan zur Migration der Partitionen des ersten Topics auf die ersten beiden Knoten zu erstellen.

Zuerst müssen Sie festlegen, welche Themen verschoben werden sollen. Das Skript akzeptiert eine JSON-Datei mit der Themendefinition. Erstellen Sie diese daher und öffnen Sie sie zur Bearbeitung:

vi topics-to-move.json

Fügen Sie die folgenden Zeilen hinzu:

{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}

Unter „Titel“ definieren Sie ein Objekt, das auf das erste Thema verweist. Speichern und schließen Sie die Datei anschließend.

Führen Sie den folgenden Befehl aus, um den Migrationsplan zu generieren, und ersetzen Sie dabei kafka1.your_domain durch die Domain, die auf einen Ihrer Kafka-Knoten verweist:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

Sie übergeben “1,2” an –broker-list, wobei „1,2“ die ID der Zielbroker darstellt.

Die Ausgabe wird in etwa so aussehen:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Dieses Skript erstellte insgesamt zwei Pläne, die das aktuelle und das vorgeschlagene Partitionslayout beschreiben. Der erste Plan dient dazu, die Änderungen später rückgängig zu machen. Beachten Sie den zweiten Plan, den Sie in einer separaten Datei namens migration-plan.json speichern. Erstellen und öffnen Sie diese Datei zur Bearbeitung.

vi migration-plan.json

Fügen Sie die zweite ausführbare Datei hinzu:

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Speichern und schließen Sie die Datei. Führen Sie anschließend folgenden Befehl aus, um sie zu starten:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

Die Ausgabe lautet:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

Das Skript zeigte an, dass die Migration gestartet wurde. Um den Fortschritt der Migration anzuzeigen, geben Sie stattdessen --verify ein:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

Nach einiger Zeit sieht die Ausgabe folgendermaßen aus:

OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic

Nun können Sie das erste Problem erläutern, um zu überprüfen, ob auf Broker 3 keine Partitionen vorhanden sind:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Die Ausgabe wird wie folgt aussehen:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

Lediglich die Broker 1 und 2 sind als Replikate und ISRs vorhanden, was bedeutet, dass die Migration erfolgreich war.

An diesem Punkt haben Sie einen Migrationsplan erstellt, um das erste Thema von Broker 3 auf die übrigen Themen zu verschieben, und gelernt, wie Sie überprüfen können, ob die Migration reibungslos verlaufen ist.

Ergebnis

Sie verfügen nun über einen Kafka-Cluster mit drei Knoten, die über das KRaft-Protokoll kommunizieren. Sie haben außerdem gelernt, wie Sie den Cluster und dessen Partitionsstruktur untersuchen. Die Clusterredundanz haben Sie getestet, indem Sie einen Knoten heruntergefahren und Daten aus einem Topic gelesen haben. Schließlich haben Sie gelernt, wie Sie Topics auf Clusterknoten umleiten.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Das könnte Ihnen auch gefallen