Einführung
Apache Kafka stellt Shell-Skripte zum Senden und Empfangen einfacher Textnachrichten an und von einem Kafka-Cluster bereit. Diese eignen sich zwar gut für Erkundungs- und Experimentierzwecke, doch in der Praxis greifen Anwendungen programmatisch auf Kafka zu. Daher bietet Kafka zahlreiche Client-Bibliotheken für gängige Programmiersprachen und -umgebungen. In diesem Tutorial erstellen Sie eine Java-Anwendung, die Daten in einem Kafka-Topic erzeugt. Sie erstellen ein Java-Projekt mit Apache Maven, einem Tool zum Erstellen und Verpacken von Java-Projekten, und fügen die Kafka-Client-Bibliothek als Abhängigkeit hinzu. Anschließend implementieren Sie eine Klasse, die den Kafka-Client nutzt, um Nachrichten zu erzeugen und die zugehörigen Metadaten im Cluster abzurufen.
Voraussetzungen
- Ein Gerät mit mindestens 4 GB RAM und 2 CPUs
- Java Development Kit (JDK) 8 oder höher ist auf Ihrem Droplet oder lokalen Rechner installiert.
- Apache Kafka ist auf Ihrem Droplet oder lokalen Rechner installiert und konfiguriert.
- Einführung in die Standardverzeichnisstruktur für Java-Projekte
Schritt 1 – Ein Maven-Projekt erstellen
In diesem Schritt installieren Sie Apache Maven und erstellen damit ein Projekt, das Sie zur Kommunikation mit Kafka verwenden. Unter Ubuntu ist Maven in den offiziellen Repositories verfügbar.
Zuerst listen Sie Ihre verfügbaren Pakete auf, indem Sie folgenden Befehl ausführen:
sudo apt updateUm es zu installieren, führen Sie folgenden Befehl aus:
sudo apt install mavenÜberprüfen Sie anhand der Versionsnummer, ob es installiert ist:
mvn --versionDie Ausgabe wird je nach Java-Version und Plattform in etwa wie folgt aussehen:
OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"Erstellen Sie als Nächstes ein Verzeichnis, in dem Sie Ihre Java-Projekte für die Arbeit mit Kafka speichern:
mkdir ~/kafka-projectsWechseln Sie in das neu erstellte Verzeichnis:
cd ~/kafka-projectsAnschließend erstellen Sie ein leeres Java-Projekt, indem Sie Folgendes ausführen:
mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=falseHier weisen Sie Maven an, ein neues Projekt mit dem Namen zu erstellen Dokafka Mit Gruppen-ID com.dokafka Die Gruppen-ID identifiziert dieses Projekt innerhalb des gesamten Ökosystems eindeutig. Maven Dieses Projekt basiert auf dem Archetyp maven-archetype-quickstart wird erstellt, und Maven wird die Vorlagen auf diese Weise aufrufen.
Es wird eine umfangreiche Ausgabe geben, insbesondere wenn Maven zum ersten Mal ausgeführt wird. Die Ausgabe endet etwa so:
Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------Maven lädt die benötigten Java-Pakete aus seinem zentralen Repository herunter und erstellt das Projekt. Dokafka Verwendung des Musters maven-archetype-quickstart hat erstellt.
Wechseln Sie mit folgendem Befehl in das Projektverzeichnis:
cd dokafkaDie Projektstruktur ist wie folgt:
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.javaAls Teil der Voraussetzungen haben Sie die Standard-Maven-Projektstruktur kennengelernt, die Sie hier sehen. Verzeichnis src/main/java Enthält den Quellcode des Projekts, src/test/java Enthält experimentelle Ressourcen und pom.xml Im Stammverzeichnis des Projekts befindet sich die Hauptkonfigurationsdatei von Maven.
Dieses Projekt enthält nur eine Quelldatei. App.java Zeige seinen Inhalt an, um zu sehen, was Maven erzeugt hat:
cat src/main/java/com/dokafka/App.javaDie Ausgabe lautet:
package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}Um diesen Code auszuführen, müssen Sie zuerst das Projekt erstellen, indem Sie Folgendes ausführen:
mvn packageMaven kompiliert den Code und packt ihn in eine ausführbare JAR-Datei. Die Ausgabe endet etwa so, was bedeutet, dass der Vorgang abgeschlossen ist:
Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------Maven-Datei KRUG Das erzielte Ergebnis wurde in die Zielliste aufgenommen. App Führen Sie den folgenden Befehl aus, den Sie gerade erstellt haben, und geben Sie die vollständige Klassen-ID ein:
java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.AppDie Ausgabe lautet:
OutputHello World!Sie haben Maven installiert und ein leeres Java-Projekt erstellt. Als Nächstes fügen Sie die notwendigen Abhängigkeiten für Kafka hinzu.
Schritt 2 – Hinzufügen von Maven-Abhängigkeiten
Nun fügen Sie den Kafka-Java-Client sowie weitere Abhängigkeiten für die Protokollierung zu Ihrem Projekt hinzu. Sie können Maven auch so konfigurieren, dass diese Abhängigkeiten beim Packen automatisch eingebunden werden. Zuerst fügen Sie die Kafka-Client-Abhängigkeiten hinzu. Rufen Sie dazu die Maven-Repository-Seite des Java-Clients in Ihrem Browser auf, wählen Sie die neueste verfügbare Version aus und kopieren Sie den bereitgestellten XML-Codeausschnitt für Maven. Zum Zeitpunkt der Erstellung dieses Dokuments ist die neueste Version der Java-Client-Bibliothek [Version einfügen]. 3.7.0 Es war.
Abhängigkeiten zu pom.xml Sie werden dem Stammverzeichnis Ihres Projekts hinzugefügt. Öffnen Sie es zur Bearbeitung:
nano pom.xmlAbschnitt <dependencies> Suchen Sie nach ; und fügen Sie die Abhängigkeitsdefinition hinzu:
...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>Dadurch wird die Kafka-Clientbibliothek in Ihr Projekt eingebunden. Die Bibliothek selbst benötigt jedoch zwei weitere Abhängigkeiten, die Sie manuell hinzufügen müssen. Diese stammen aus der SLF4J-Bibliothek, die zum Protokollieren von Meldungen verwendet wird. SLF4J unterstützt viele Protokollierungsbibliotheken und ermöglicht dem Entwickler Flexibilität bei der Verarbeitung von Protokollmeldungen. Die beiden hinzuzufügenden Abhängigkeiten sind:
- slf4j-api, die Bibliothek selbst.
- slf4j-simple, das Protokolle verarbeitet und Ausgaben im Terminal ausgibt.
Sobald Sie die Abhängigkeiten definiert haben, müssen Sie diese zusammen mit der endgültigen JAR-Datei, die erstellt wird, verfügbar machen. Abschnitt &<build> pom.xml Suchen und fügen Sie die markierten Zeilen hinzu:
...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...Hier das Plugin maven-dependency-plugin Sie konfigurieren das Kopieren aller Abhängigkeiten beim Erstellen des Pakets. Die Abhängigkeits-JAR-Dateien befinden sich in dieser Projektkonfiguration unter Ziel/Bibliothek Beachten Sie, dass Sie den &-Teil nicht einbeziehen sollten.<plugins> Das bestehende in <pluginManagement> Ändern Sie die Datei. Wenn Sie fertig sind, speichern und schließen Sie die Datei.
Erstellen Sie das Projekt, um sicherzustellen, dass alles korrekt konfiguriert ist:
mvn packageDas Ende der Ausgabe sollte wie folgt aussehen:
Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------Sie können die Dateien unten herunterladen. Ziel/Bibliothek Liste, um sicherzustellen, dass die Abhängigkeiten tatsächlich kopiert werden:
Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jarSie haben die notwendigen Abhängigkeiten zu Ihrem Maven-Projekt hinzugefügt. Nun möchten Sie eine Verbindung zu Kafka herstellen und regelmäßig Nachrichten erzeugen.
Schritt 3 – Erstellen eines Kafka-Producers in Java
In diesem Schritt richten Sie einen Kafka-Producer in Java ein und schreiben Nachrichten in ein Topic.
Gemäß der Projektstruktur befindet sich der Quellcode unten. src/main/java/com/dokafka wird gespeichert. Da es für den Rest des Trainings nicht benötigt wird. App.java Falls Sie es nicht haben, entfernen Sie es durch Ausführen von:
rm src/main/java/com/dokafka/App.javaSie speichern den Produzentencode in einer Klasse namens ProducerDemo. Erstellen und öffnen Sie die zugehörige Datei zur Bearbeitung:
nano src/main/java/com/dokafka/ProducerDemo.javaFügen Sie die folgenden Zeilen hinzu:
package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}Erste Klasse Produzentendemo Sie definieren und importieren die verwendeten Klassen und erstellen ein Logger In der Hauptmethode deklarieren Sie zunächst die Adresse des Kafka-Clusters (bootstrapServers) und den Namen des Themas, zu dem die Nachricht generiert werden soll (topicName).
Dann ein Objekt Eigenschaften Sie instanziieren ein Objekt, das einem Schlüssel-Wert-Wörterbuch ähnelt und die Konfiguration für Ihre Kafka-Producer-Funktion enthält. Sie geben die Eigenschaft an. BOOTSTRAP_SERVERS_CONFIG Sie geben die Adresse des Kafka-Clusters ein. Außerdem die Einträge KEY_SERIALIZER_CLASS_CONFIG Und VALUE_SERIALIZER_CLASS_CONFIG An StringSerializer.class.getName() Satz.
Diese Eigenschaften legen fest, welche Serialisierer zur Verarbeitung der Schlüssel und Werte der generierten Nachrichten verwendet werden sollen. Serialisierer sind Klassen, die Eingaben entgegennehmen und ein Byte-Array als Ausgabe zurückgeben, das zur Übertragung über das Netzwerk bereit ist. Deserialisierer Sie machen das Gegenteil und rekonstruieren das ursprüngliche Objekt aus dem Bytestrom. Hierbei werden sowohl Schlüssel als auch Wert verwendet. StringSerializer Interne Daten werden als Zeichenketten serialisiert.
Als nächstes werden Sie ein KafkaProducer Sie deklarieren und instanziieren:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);Konstruktor für Schlüssel und Werte vom Typ Zeichenkette Akzeptiert Folgendes mit den zugehörigen Eigenschaften zur Konfiguration. So senden Sie eine Nachricht an ein Thema: KafkaProducer Es akzeptiert einen ProducerRecord, der nach dem Betreff und der Nachricht selbst benannt ist. Hallo Welt! Sie nehmen eine Stichprobe vor. Beachten Sie, dass der Urheber selbst nicht an ein bestimmtes Thema gebunden ist.
Nach dem Senden der Nachricht blinkt der Produzent und schließt sich. Anruf producer.send() Die Verarbeitung ist asynchron, d. h. der Programmablauf kehrt zur Hauptmethode zurück, während die Nachricht in einem anderen Thread gesendet wird. Da dieses Beispielprogramm anschließend beendet werden soll, wird der Producer durch Leeren des Datenspeichers gezwungen, die restlichen Daten zu senden. Anschließend wird der Producer mit `close()` geschlossen, um Kafka zu signalisieren, dass er beendet wird.
Als Nächstes erstellen Sie ein Skript, das die Erstellung und Ausführung von ProducerDemo übernimmt. Speichern Sie es in einer Datei namens run-producer.sh. Erstellen und öffnen Sie die Datei zur Bearbeitung:
nano run-producer.shFügen Sie die folgenden Zeilen hinzu:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemoWenn Sie fertig sind, speichern und schließen Sie die Datei. Der markierte Bereich zeigt an, wo sich die Abhängigkeiten befinden.
Markieren Sie es anschließend als ausführbar:
chmod +x run-producer.shVersuchen Sie abschließend, eine „Hallo Welt!“-Nachricht zu generieren, indem Sie folgenden Befehl ausführen:
./run-producer.shDie Ausgabe wird lang sein und sollte am Ende etwa so aussehen:
Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregisteredEin KafkaProducer wurde registriert und erfolgreich erstellt, konnte aber später nicht mehr registriert werden. Die Nachricht wird nun in das Topic „java_demo“ geschrieben und kann mit dem Skript „kafka-console-consumer.sh“ abgerufen werden.
Öffnen Sie eine separate Shell, wechseln Sie in Ihr Kafka-Installationsverzeichnis und führen Sie den folgenden Befehl aus, um das Topic zu lesen:
bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092Die Ausgabe lautet:
OutputHello World!Sie können das Programm beenden. STRG+C Drücken.
Senden Sie in diesem Schritt programmatisch eine Nachricht im Betreff. java_demo Sie haben die Nachricht mithilfe des von Kafka bereitgestellten Bash-Skripts generiert und wieder ausgelesen. Nun lernen Sie, wie Sie die Informationen nutzen, die Kafka nach erfolgreichem Senden einer Nachricht zurückgibt.
Schritt 4 – Metadaten mithilfe von Rückruffunktionen abrufen
Verfahren send() KafkaProducer Es akzeptiert Rückruffunktionen, mit denen Sie auf Ereignisse reagieren können, beispielsweise auf den Empfang eines Datensatzes. Dies ist nützlich, um Informationen darüber abzurufen, wie der Cluster den Datensatz verarbeitet.
Um den Kontakt zu erweitern schicken() Mit einem Rückruf, zuerst Produzentendemo Zur Bearbeitung geöffnet:
nano src/main/java/com/dokafka/ProducerDemo.javaÄndern Sie den Code wie folgt:
...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...Fügen Sie nun eine Implementierung der Callback-Schnittstelle zur Methode hinzu. schicken() Sie überweisen und onCompletion() Sie setzen das um Datensatzmetadaten und optional ein Ausnahme Sie empfangen die Nachricht. Tritt ein Fehler auf, protokollieren Sie diesen. Andernfalls protokollieren Sie Zeitstempel, Partitionsnummer und Offset des aktuell im Cluster befindlichen Datensatzes. Da das Senden der Nachricht auf diese Weise asynchron erfolgt, wird Ihr Code aufgerufen, sobald der Cluster den Datensatz akzeptiert, ohne dass Sie explizit darauf warten müssen.
Wenn Sie fertig sind, speichern und schließen Sie die Datei und führen Sie dann den Builder aus:
./run-producer.shBeachten Sie die neue Meldung am Ende der Ausgabe:
Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...Die neu generierte Nachricht wurde vom Cluster akzeptiert und in Partition 0 gespeichert.
Wenn Sie es erneut ausführen, werden Sie feststellen, dass der Offset um eins größer ist, was die Position der Nachricht in der Partition angibt:
Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3Ergebnis
In diesem Artikel haben Sie ein Java-Projekt mit Maven erstellt und es mit Abhängigkeiten für die Kommunikation mit Kafka ausgestattet. Anschließend haben Sie eine Klasse erstellt, die Nachrichten an Ihren Kafka-Cluster sendet, und diese erweitert, um die gesendeten Datensatzinformationen abzurufen.









