Bir veri kaynağı için CLI aracılığıyla Kafka üreticisi nasıl kurulur?

0 Hisse senetleri
0
0
0
0

giriiş

Apache Kafka, bir Kafka kümesine ve kümesinden temel metin mesajları üretmek ve almak için kabuk betikleri sağlar. Bunlar keşif ve deneme için kullanışlı olsa da, gerçek dünya uygulamaları Kafka'ya programatik olarak erişir. Bu amaçla Kafka, popüler programlama dilleri ve ortamları için birçok istemci kütüphanesi sunar. Bu eğitimde, bir Kafka konusuyla ilgili veri üreten bir Java uygulaması oluşturacaksınız. Java projelerini oluşturmak ve paketlemek için bir araç olan Apache Maven kullanarak bir Java projesi oluşturacak ve Kafka istemci kütüphanesini bağımlılık olarak ekleyeceksiniz. Ardından, mesajlar üreterek ve bunlar hakkında küme içi meta verileri alarak Kafka istemcisini kullanan bir sınıf uygulayacaksınız.

Ön koşullar
  • En az 4 GB RAM ve 2 CPU'ya sahip bir cihaz
  • Droplet veya yerel makinenize kurulu Java Geliştirme Kiti (JDK) 8 veya üzeri
  • Apache Kafka, Droplet veya yerel makinenize yüklendi ve yapılandırıldı
  • Java projeleri için standart dizin düzenine giriş

Adım 1 – Bir Maven Projesi Oluşturun

Bu adımda, Apache Maven'ı kuracak ve Kafka ile iletişim kurmak için kullanacağınız bir proje oluşturacaksınız. Ubuntu'da Maven, resmi depolarda kolayca bulunabilir.

Öncelikle şu komutu çalıştırarak kullanılabilir paketlerinizi listeleyin:

sudo apt update

Yüklemek için aşağıdaki komutu çalıştırın:

sudo apt install maven

Sürüm numarasını okuyarak kurulu olduğunu doğrulayın:

mvn --version

Çıktı, Java sürümünüze ve platformunuza bağlı olarak aşağıdakine benzer olacaktır:

OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"

Daha sonra Kafka ile çalışmak için Java projelerinizi depolayacağınız bir dizin oluşturun:

mkdir ~/kafka-projects

Yeni oluşturulan dizine gidin:

cd ~/kafka-projects

Daha sonra aşağıdakileri çalıştırarak boş bir Java projesi oluşturun:

mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

Burada, Maven'a şu adla yeni bir proje oluşturmasını söylersiniz: dokafka Grup kimliğiyle com.dokafka Grup kimliği bu projeyi ekosistem genelinde benzersiz bir şekilde tanımlar. Maven Bu proje arketipe dayanmaktadır maven-arketipi-hızlıbaşlangıç Maven'ın bu şekilde adlandıracağı şablonlar oluşturulacaktır.

Özellikle Maven ilk kez çalıştırılıyorsa, çok sayıda çıktı olacaktır. Çıktının sonu şöyle görünecektir:

Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------

Maven, merkezi deposundan gerekli Java paketlerini indirir ve projeyi derler. dokafka deseni kullanarak maven-arketipi-hızlıbaşlangıç yaratmıştır.

Aşağıdaki komutu çalıştırarak proje dizinine gidin:

cd dokafka

Proje yapısı şu şekildedir:

├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java

Ön koşulların bir parçası olarak, burada gördüğünüz standart Maven proje yapısını öğrendiniz. Dizin src/main/java Projenin kaynak kodunu tutar, src/test/java Deneysel kaynaklar içerir ve pom.xml Projenin temelinde ana Maven yapılandırma dosyası yer alır.

Bu proje yalnızca bir kaynak dosyası içeriyor, Uygulama.java Maven'ın ne ürettiğini görmek için içeriğini gösterin:

cat src/main/java/com/dokafka/App.java

Çıktı şu şekilde olacaktır:

package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}

Bu kodu çalıştırmak için öncelikle projeyi şu komutu çalıştırarak derlemeniz gerekir:

mvn package

Maven kodu derleyip çalıştırılmak üzere bir JAR dosyasına paketleyecektir. Çıktının sonu şöyle olacaktır, yani tamamlanmıştır:

Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------

Maven dosyası KAVANOZ Elde edilen sonuç hedef liste altına yerleştirildi. Sınıfı çalıştırmak için Uygulama Az önce oluşturduğunuz sınıfın tam kimliğini girmek için aşağıdaki komutu çalıştırın:

java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App

Çıktı şu şekilde olacaktır:

OutputHello World!

Maven'ı kurdunuz ve boş bir Java projesi oluşturdunuz. Şimdi Kafka için gerekli bağımlılıkları ekleyeceksiniz.

Adım 2 – Maven Bağımlılıklarını Ekleme

Şimdi projenize Kafka Java istemcisini ve günlük kaydı için diğer bağımlılıkları ekleyeceksiniz. Maven'ı da paketleme sırasında bu bağımlılıkları içerecek şekilde yapılandırabilirsiniz. İlk olarak, Kafka istemci bağımlılıklarını ekleyeceksiniz. Tarayıcınızda Java istemcisi için Maven deposu sayfasına gidin ve mevcut en son sürümü seçin, ardından Maven için sağlanan XML kod parçacığını kopyalayın. Bu yazının yazıldığı tarihte, Java istemci kitaplığının en son sürümü şu şekildedir: 3.7.0 Oldu.

Bağımlılıklar pom.xml Projenizin köküne eklenecekler. Düzenlemek için açın:

nano pom.xml

Bölüm <dependencies&gt; ;'yi bulun ve bağımlılık tanımını ekleyin:

...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>

Bu, projenize Kafka istemci kütüphanesini sağlayacaktır. Ancak, kütüphanenin kendisi manuel olarak eklemeniz gereken iki bağımlılık daha gerektirir. Bunlar, mesajları kaydetmek için kullanılan SLF4J kütüphanesinden gelir; çünkü birçok günlük kütüphanesini destekler ve geliştiricinin günlük mesajlarının nasıl işleneceği konusunda esnek olmasını sağlar. Eklemeniz gereken iki bağımlılık şunlardır:

  • slf4j-api, kütüphanenin kendisidir
  • Günlükleri işleyen ve bunları terminale çıkaran slf4j-simple

Bağımlılıkları tanımladıktan sonra, bunları son oluşturulan JAR ile birlikte kullanılabilir hale getirmeniz gerekir. Bölüm ve<yapı> pom.xml Vurgulanan satırları bulup ekleyin:

...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...

İşte eklenti maven-bağımlılık-eklentisi Tüm bağımlılıkları paketleme sırasında kopyalayacak şekilde yapılandırıyorsunuz. Bu proje yapılandırmasındaki bağımlılık JAR dosyaları, hedef/kütüphane & kısmını eklememeniz gerektiğini unutmayın.<eklentiler> Mevcut olanı <pluginManagement&gt; Değiştir. İşiniz bittiğinde dosyayı kaydedip kapatın.

Her şeyin doğru şekilde yapılandırıldığından emin olmak için projeyi derleyin:

mvn package

Çıktının sonu şu şekilde görünmelidir:

Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------

Aşağıdaki dosyaları indirebilirsiniz. hedef/kütüphane Bağımlılıkların gerçekten kopyalandığından emin olmak için liste:

Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar

Maven projenize gerekli bağımlılıkları eklediniz. Şimdi Kafka'ya bağlanıp zamanlanmış mesajlar üretmek istiyorsunuz.

Adım 3 – Java'da bir Kafka Üreticisi Oluşturun

Bu adımda Java'da bir Kafka üreticisi kurup bir konuya mesajlar yazacaksınız.

Proje yapısına göre kaynak kodu aşağıdadır. src/main/java/com/dokafka kaydedilir. Eğitimin geri kalanında ihtiyaç duyulmayacağı için Uygulama.java Eğer yoksa, şunu çalıştırarak kaldırabilirsiniz:

rm src/main/java/com/dokafka/App.java

Üretici kodunu ProducerDemo adlı bir sınıfta saklayacaksınız. Eşlik eden dosyayı oluşturun ve düzenlemek için açın:

nano src/main/java/com/dokafka/ProducerDemo.java

Aşağıdaki satırları ekleyin:

package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}

Birinci sınıf ÜreticiDemo Tanımlayın, kullanılan sınıfları içe aktarın ve bir tane oluşturun Kaydedici Ana yöntemde öncelikle Kafka kümesinin adresini (bootstrapServers) ve mesajın üretileceği konunun adını (topicName) bildiriyorsunuz.

Sonra bir nesne Özellikler Anahtar-değer sözlüğüne benzeyen ve Kafka üretici işlevinizin yapılandırmasını tutan bir örnek oluşturursunuz. Özelliği belirtirsiniz BOOTSTRAP_SERVERS_CONFIG Kafka küme adresini doldurun. Ayrıca, girişler KEY_SERİLEŞTİRİCİ_SINIF_YAPILANDIRMASI Ve DEĞER_SERİLEŞTİRİCİ_SINIF_YAPILANDIRMASI Açık StringSerializer.class.getName() Ayarlamak.

Bu özellikler, üretilen mesajların anahtar ve değerlerini işlemek için hangi serileştiricilerin kullanılacağını belirtir. Serileştiriciler, girdi kabul eden ve çıktı olarak ağ üzerinden iletilmeye hazır bir bayt dizisi döndüren sınıflardır. Serileştiriciler Tam tersini yaparlar ve orijinal nesneyi bayt akışından yeniden oluştururlar. Burada hem anahtar hem de değer kullanılır. Dize Serileştirici İç kısımlar dizeler halinde serileştirilir.

Daha sonra, bir tane olacak Kafka Yapımcısı Şunu beyan edip örneklendirin:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Anahtar ve değer türlerinin oluşturucusu Sicim Yapılandırma için ilişkili özelliklerle birlikte aşağıdakileri kabul eder. Bir konuya mesaj göndermek için, Kafka Yapımcısı Konuya ve mesajın kendisine göre isimlendirilen bir ProducerRecord kabul eder. Selam Dünya! Örneklemleme yapıyorsunuz. Yaratıcının belirli bir konuya bağlı olmadığını unutmayın.

Mesajı gönderdikten sonra flaş yapıp üreticiyi kapatıyorsunuz. üretici.gönder() Eşzamansızdır, yani mesaj başka bir iş parçacığında gönderilirken kontrol akışı ana yönteme döner. Bu örnek program bundan sonra çıkmak istediğinden, üreticiyi kalan ne varsa onu göndermeye zorlarsınız. Ardından, üreticinin yok edildiğini Kafka'ya bildirmek için onu kapatırsınız (close()).

Ardından, ProducerDemo'nun derlenip çalıştırılmasını sağlayacak bir betik oluşturacaksınız. Bunu run-producer.sh adlı bir dosyaya kaydedeceksiniz. Oluşturun ve düzenlemek için açın:

nano run-producer.sh

Aşağıdaki satırları ekleyin:

#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo

İşiniz bittiğinde dosyayı kaydedip kapatın. Vurgulanan alan, bağımlılıkların nerede bulunduğunu gösterir.

Daha sonra çalıştırılabilir olarak işaretleyin:

chmod +x run-producer.sh

Son olarak, şunu çalıştırarak bir Merhaba Dünya! mesajı üretmeyi deneyin:

./run-producer.sh

Çıktı uzun olacak ve aşağıdaki gibi olmalı:

Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

Başarılı bir şekilde oluşturulan ve daha sonra kaydedilemeyen KafkaProducer kaydedildi. Mesaj artık java_demo konusuna yazıldı ve kafka-console-consumer.sh betiğini kullanarak alabilirsiniz.

Ayrı bir kabukta, Kafka kurulum dizininize gidin ve konuyu okumak için aşağıdaki komutu çalıştırın:

bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092

Çıktı şu şekilde olacaktır:

OutputHello World!

Çıkabilirsiniz. CTRL+C Basmak .

Bu aşamada programlı olarak konu kısmına bir mesaj gönderin. java_demo Kafka'nın sağladığı bash betiğini kullanarak mesajı oluşturdunuz ve geri okudunuz. Şimdi, Kafka'nın bir mesaj başarıyla gönderildikten sonra döndürdüğü bilgileri nasıl kullanacağınızı öğreneceksiniz.

Adım 4 – Geri Aramaları Kullanarak Meta Verileri Alın

Yöntem gönder() KafkaProducer Bir kaydın alınması gibi, meydana gelen olaylar üzerinde işlem yapmanıza olanak tanıyan geri aramaları kabul eder. Bu, kümenin kaydı nasıl işlediği hakkında bilgi almak için kullanışlıdır.

İletişimi genişletmek için Göndermek() Geri aramayla, ilk önce ÜreticiDemo Düzenlemeye açık:

nano src/main/java/com/dokafka/ProducerDemo.java

Kodu aşağıdaki şekilde değiştirin:

...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...

Şimdi Callback arayüzünün bir uygulamasını metoda ekleyelim. Göndermek() Siz transfer edin ve onCompletion() Bunu uyguluyorsun KayıtMetaverileri ve isteğe bağlı olarak bir İstisna alır. Ardından, bir hata oluşursa, bunu günlüğe kaydedersiniz. Aksi takdirde, kümede bulunan kaydın zaman damgasını, bölüm numarasını ve ofsetini günlüğe kaydedersiniz. İletiyi bu şekilde göndermek eşzamansız olduğundan, küme kaydı kabul ettiğinde kodunuz çağrılır ve bunun gerçekleşmesini açıkça beklemenize gerek kalmaz.

İşiniz bittiğinde dosyayı kaydedip kapatın, ardından oluşturucuyu çalıştırın:

./run-producer.sh

Çıktının sonunda yeni bir mesaj göreceksiniz:

Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...

Yeni oluşturulan mesaj küme tarafından kabul edildi ve 0. bölümde saklandı.

Tekrar çalıştırdığınızda, ofsetin bir büyük olduğunu ve mesajın bölümdeki yerini gösterdiğini fark edeceksiniz:

Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3

Sonuç

Bu makalede, Maven kullanarak bir Java projesi oluşturdunuz ve Kafka ile iletişim kurmak için bağımlılıklarla donattınız. Ardından, Kafka kümenize mesaj üreten bir sınıf oluşturdunuz ve gönderilen kayıt bilgilerini almak için genişlettiniz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

Ayrıca Şunları da Beğenebilirsiniz