مقدمه
آپاچی کافکا اسکریپت های پوسته ای را برای تولید و مصرف پیام های متنی اولیه به و از یک خوشه کافکا فراهم می کند. در حالی که آنها برای کاوش و آزمایش مفید هستند، برنامه های کاربردی دنیای واقعی به صورت برنامه نویسی به کافکا دسترسی دارند. برای این منظور، کافکا کتابخانه های مشتری زیادی را برای زبان ها و محیط های برنامه نویسی پرکاربرد ارائه می دهد. در این آموزش، یک برنامه جاوا ایجاد می کنید که داده ها را در یک موضوع کافکا تولید می کند. شما با استفاده از Apache Maven که ابزاری برای ساخت و بسته بندی پروژه های جاوا است، یک پروژه جاوا ایجاد می کنید و کتابخانه مشتری کافکا را به عنوان یک وابستگی اضافه می کنید. سپس، کلاسی را پیادهسازی میکنید که با تولید پیامها و بازیابی ابردادههای درون خوشهای درباره آنها، از مشتری کافکا استفاده میکند.
پیش نیازها
- دستگاهی با حداقل 4 گیگابایت رم و 2 سی پی یو
- کیت توسعه جاوا (JDK) 8 یا بالاتر روی Droplet یا دستگاه محلی شما نصب شده است
- آپاچی کافکا روی Droplet یا ماشین محلی شما نصب و پیکربندی شده است
- آشنایی با طرح دایرکتوری استاندارد پروژه های جاوا
مرحله 1 – ایجاد یک پروژه Maven
در این مرحله، Apache Maven را نصب میکنید و از آن برای ایجاد پروژهای استفاده میکنید که از آن برای ارتباط با کافکا استفاده میکنید. در اوبونتو، Maven به راحتی در مخازن رسمی موجود است.
ابتدا لیست بسته های موجود خود را با اجرای:
sudo apt updateبرای نصب آن دستور زیر را اجرا کنید:
sudo apt install mavenبا خواندن شماره نسخه آن، تأیید کنید که نصب شده است:
mvn --versionخروجی بسته به نسخه جاوا و پلتفرم شما مشابه موارد زیر خواهد بود:
OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"در مرحله بعد، دایرکتوری ایجاد کنید که در آن پروژه های جاوا خود را برای کار با کافکا ذخیره کنید:
mkdir ~/kafka-projectsبه دایرکتوری تازه ایجاد شده بروید:
cd ~/kafka-projectsسپس با اجرای زیر یک پروژه جاوا خالی تولید کنید:
mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=falseدر اینجا، به Maven دستور می دهید که پروژه جدیدی به نام dokafka با شناسه گروهی com.dokafka تولید کند. شناسه گروه به طور منحصر به فرد این پروژه را در سراسر اکوسیستم Maven شناسایی می کند. این پروژه بر اساس کهن الگوی maven-archetype-quickstart ایجاد خواهد شد، که Maven الگوها را به این شکل فراخوانی می کند.
خروجی های زیادی وجود خواهد داشت، به خصوص اگر این اولین باری باشد که Maven اجرا می شود. انتهای خروجی به شکل زیر خواهد بود:
Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------Maven بسته های جاوا لازم را از مخزن مرکزی خود دانلود کرده و پروژه dokafka را با استفاده از الگوی maven-archetype-quickstart ایجاد کرده است.
با اجرای زیر به دایرکتوری پروژه بروید:
cd dokafkaساختار پروژه به شکل زیر است:
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.javaبه عنوان بخشی از پیش نیازها، شما در مورد ساختار استاندارد پروژه Maven که در اینجا مشاهده می کنید، یاد گرفتید. دایرکتوری src/main/java کد منبع پروژه را نگه میدارد، src/test/java حاوی منابع آزمایشی است و pom.xml در ریشه پروژه، فایل پیکربندی اصلی Maven است.
این پروژه تنها حاوی یک فایل منبع، App.java است. محتویات آن را نشان دهید تا ببینید Maven چه چیزی تولید کرده است:
cat src/main/java/com/dokafka/App.javaخروجی خواهد بود:
package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}برای اجرای این کد، ابتدا باید پروژه را با اجرای زیر بسازید:
mvn packageMaven کد را کامپایل کرده و آن را در یک فایل JAR بسته بندی می کند تا اجرا شود. انتهای خروجی به این صورت خواهد بود که به معنای تکمیل آن است:
Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------Maven فایل JAR به دست آمده را زیر فهرست هدف قرار داد. برای اجرای کلاس App که به تازگی ساخته اید، دستور زیر را اجرا کنید و شناسه کامل کلاس را وارد کنید:
java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.Appخروجی خواهد بود:
OutputHello World!شما Maven را نصب کرده اید و یک پروژه جاوا خالی ایجاد کرده اید. در مرحله بعد وابستگی های لازم را برای کافکا اضافه خواهید کرد.
مرحله 2 – افزودن وابستگی های Maven
اکنون کلاینت جاوا کافکا و همچنین وابستگی های دیگر را برای لاگ به پروژه خود اضافه خواهید کرد. همچنین میتوانید Maven را طوری پیکربندی کنید که این وابستگیها را در طول بستهبندی لحاظ کند. ابتدا، وابستگی مشتریان کافکا را اضافه می کنید. به صفحه مخزن Maven برای مشتری جاوا در مرورگر خود بروید و آخرین نسخه موجود را انتخاب کنید، سپس قطعه XML ارائه شده را برای Maven کپی کنید. در زمان نگارش، آخرین نسخه کتابخانه کلاینت جاوا 3.7.0 بود.
وابستگی ها به pom.xml در ریشه پروژه شما اضافه می شوند. آن را برای ویرایش باز کنید:
nano pom.xmlبخش <dependencies> را پیدا کنید و تعریف وابستگی را اضافه کنید:
...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>این کار کتابخانه مشتری کافکا را در اختیار پروژه شما قرار می دهد. با این حال، خود کتابخانه به دو وابستگی دیگر نیاز دارد که باید به صورت دستی اضافه کنید. آنها از کتابخانه SLF4J که برای ثبت پیامها استفاده میکند، سرچشمه میگیرند، زیرا از بسیاری از کتابخانههای ورود به سیستم پشتیبانی میکند و به توسعهدهنده اجازه میدهد تا در مورد نحوه پردازش پیامهای گزارش انعطافپذیر باشد. دو وابستگی که باید اضافه کنید عبارتند از:
- slf4j-api که خود کتابخانه است
- slf4j-simple، که لاگ ها را پردازش کرده و به ترمینال خروجی می دهد
هنگامی که وابستگی ها را تعریف کردید، باید آنها را در کنار JAR نهایی ساخته شده در دسترس قرار دهید. بخش <build> pom.xml را پیدا کنید و خطوط برجسته را اضافه کنید:
...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...در اینجا، افزونه maven-dependency-plugin را برای کپی کردن همه وابستگی ها در زمان بسته بندی پیکربندی می کنید. فایلهای JAR وابستگیها، در این پیکربندی پروژه، تحت target/lib قرار خواهند گرفت. توجه داشته باشید که نباید بخش <plugins> موجود را در <pluginManagement> تغییر دهید. وقتی کارتان تمام شد، فایل را ذخیره و ببندید.
پروژه را بسازید تا مطمئن شوید همه چیز به درستی پیکربندی شده است:
mvn packageانتهای خروجی باید مشابه این باشد:
Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------میتوانید فایلها را در زیر target/lib فهرست کنید تا مطمئن شوید که وابستگیها واقعاً کپی شدهاند:
Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jarشما وابستگی های لازم را به پروژه Maven خود اضافه کرده اید. اکنون می خواهید به کافکا متصل شوید و پیام ها را به صورت برنامه ریزی شده تولید کنید.
مرحله 3 – ایجاد یک تولید کننده کافکا در جاوا
در این مرحله، یک تولید کننده کافکا در جاوا راه اندازی می کنید و برای یک موضوع پیام می نویسید.
مطابق ساختار پروژه، کد منبع در زیر src/main/java/com/dokafka ذخیره میشود. از آنجایی که برای بقیه آموزش نیازی به App.java نخواهید داشت، با اجرای زیر آن را حذف کنید:
rm src/main/java/com/dokafka/App.javaشما کد سازنده را در کلاسی به نام ProducerDemo ذخیره خواهید کرد. فایل همراه را برای ویرایش ایجاد و باز کنید:
nano src/main/java/com/dokafka/ProducerDemo.javaخطوط زیر را اضافه کنید:
package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}ابتدا کلاس ProducerDemo را تعریف می کنید، کلاس های استفاده شده را وارد می کنید و یک Logger ایجاد می کنید. در روش اصلی ابتدا آدرس خوشه کافکا (bootstrapServers) و نام موضوع تولید پیام (topicName) را اعلام می کنید.
سپس، یک شیء Properties را نمونهسازی میکنید، که شبیه به یک فرهنگ لغت کلید-مقدار است و پیکربندی را برای عملکرد تولیدکننده کافکا شما نگه میدارد. شما ویژگی BOOTSTRAP_SERVERS_CONFIG را با آدرس خوشه کافکا پر می کنید. همچنین ورودیهای KEY_SERIALIZER_CLASS_CONFIG و VALUE_SERIALIZER_CLASS_CONFIG را روی StringSerializer.class.getName () تنظیم کنید.
این ویژگی ها مشخص می کنند که کدام سریال سازها باید برای پردازش کلیدها و مقادیر پیام های تولید شده استفاده شوند. سریال سازها کلاس هایی هستند که ورودی را می پذیرند و آرایه ای از بایت ها را به عنوان خروجی پس می دهند و آماده انتقال از طریق شبکه هستند. Deserializers برعکس عمل می کنند و شی اصلی را از جریان بایت ها بازسازی می کنند. در اینجا، هم کلید و هم مقدار با استفاده از StringSerializer داخلی بهصورت رشتهها سریالسازی میشوند.
در مرحله بعد، شما یک KafkaProducer را اعلام و نمونهسازی میکنید:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);سازنده کلیدها و مقادیری از نوع String را با خصوصیات همراه برای پیکربندی می پذیرد. برای ارسال پیام به یک موضوع، KafkaProducer یک ProducerRecord را می پذیرد که با نام موضوع و خود پیام که Hello World! است، نمونه برداری می کنید. توجه داشته باشید که خود سازنده به موضوع خاصی وابسته نیست.
پس از ارسال پیام، تولید کننده را فلاش کرده و می بندید. فراخوانی producer.send() ناهمزمان است، به این معنی که جریان کنترل به متد اصلی باز می گردد در حالی که پیام در رشته دیگری ارسال می شود. از آنجایی که این برنامه نمونه میخواهد بعد از آن خارج شود، شما تولیدکننده را مجبور میکنید تا با فلاشینگ، هر آنچه را که باقی مانده است، ارسال کند. سپس، آن را میبندید () و به کافکا نشان میدهد که سازنده در حال نابودی است.
در مرحله بعد، یک اسکریپت ایجاد خواهید کرد که ساختمان و اجرای ProducerDemo را مدیریت می کند. شما آن را در فایلی به نام run-producer.sh ذخیره خواهید کرد. آن را برای ویرایش ایجاد و باز کنید:
nano run-producer.shخطوط زیر را اضافه کنید:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemoوقتی کارتان تمام شد، فایل را ذخیره و ببندید. قسمت برجسته مشخص می کند که وابستگی ها در کجا قرار دارند.
سپس آن را به عنوان اجرایی علامت بزنید:
chmod +x run-producer.shدر نهایت، سعی کنید یک Hello World را تولید کنید! با اجرای آن پیام دهید:
./run-producer.shخروجی طولانی خواهد بود و انتهای آن باید به این صورت باشد:
Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregisteredKafkaProducer ثبت شد که با موفقیت ایجاد شد و بعداً ثبت نشد. اکنون پیام در موضوع java_demo نوشته شده است و می توانید آن را با استفاده از اسکریپت kafka-console-consumer.sh بازیابی کنید.
در یک پوسته جداگانه، به دایرکتوری نصب کافکا خود بروید و دستور زیر را برای خواندن موضوع اجرا کنید:
bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092خروجی خواهد بود:
OutputHello World!برای خروج می توانید CTRL+C را فشار دهید.
در این مرحله، به صورت برنامهنویسی، پیامی را در موضوع java_demo تولید کردهاید و با استفاده از اسکریپت bash ارائهشده توسط کافکا، آن را دوباره بخوانید. اکنون می آموزید که چگونه از اطلاعاتی که کافکا پس از ارسال موفقیت آمیز پیام برمی گرداند، استفاده کنید.
مرحله 4 – بازیابی متادیتا با استفاده از Callbacks
متد send() KafkaProducer تماسهای برگشتی را میپذیرد، که به شما امکان میدهد بر اساس رویدادهایی که رخ میدهند، مانند زمانی که رکورد دریافت میشود، عمل کنید. این برای بازیابی اطلاعات در مورد نحوه مدیریت رکورد توسط خوشه مفید است.
برای گسترش تماس send() با یک callback، ابتدا ProducerDemo را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/ProducerDemo.javaکد را به شکل زیر تغییر دهید:
...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...اکنون یک پیاده سازی از واسط Callback را به متد send() منتقل می کنید و onCompletion() را پیاده سازی می کنید که RecordMetadata و به صورت اختیاری یک Exception را دریافت می کند. سپس، اگر خطایی رخ داد، آن را ثبت کنید. در غیر این صورت، مُهر زمانی، شماره پارتیشن و افست رکورد را که اکنون در خوشه است ثبت میکنید. از آنجایی که ارسال پیام از این طریق ناهمزمان است، زمانی که خوشه رکورد را بپذیرد، کد شما فراخوانی میشود، بدون اینکه شما صریحاً منتظر بمانید تا این اتفاق بیفتد.
وقتی کارتان تمام شد، فایل را ذخیره و ببندید، سپس سازنده را اجرا کنید:
./run-producer.shبه یک پیام جدید در انتهای خروجی توجه کنید:
Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...پیامی که به تازگی تولید شده بود توسط خوشه پذیرفته شد و در پارتیشن 0 ذخیره شد.
اگر دوباره آن را اجرا کنید، متوجه خواهید شد که افست یک بزرگتر است، که نشان دهنده مکان پیام در پارتیشن است:
Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3نتیجه
در این مقاله، شما یک پروژه جاوا را با استفاده از Maven ایجاد کردهاید و آن را به وابستگیهایی برای ارتباط با کافکا مجهز کردهاید. سپس، کلاسی ایجاد کردهاید که پیامهایی را به خوشه کافکا شما تولید میکند و آن را برای بازیابی اطلاعات سوابق ارسالی گسترش میدهد.
 
					








