Как настроить производитель Kafka для источника данных через CLI

0 Акции
0
0
0
0

Введение

Apache Kafka предоставляет скрипты оболочки для создания и получения простых текстовых сообщений в кластере Kafka. Хотя они полезны для исследований и экспериментов, реальные приложения обращаются к Kafka программно. Для этого Kafka предоставляет множество клиентских библиотек для популярных языков программирования и сред. В этом руководстве вы создадите приложение Java, которое генерирует данные в теме Kafka. Вы создадите проект Java с помощью Apache Maven, инструмента для сборки и упаковки проектов Java, и добавите клиентскую библиотеку Kafka в качестве зависимости. Затем вы реализуете класс, который использует клиент Kafka, создавая сообщения и извлекая метаданные о них из кластера.

Предпосылки
  • Устройство с не менее 4 ГБ оперативной памяти и 2 процессорами
  • Java Development Kit (JDK) 8 или выше, установленный на вашем Droplet или локальном компьютере
  • Apache Kafka установлен и настроен на вашем Droplet или локальном компьютере
  • Введение в стандартную структуру каталогов проектов Java

Шаг 1 — Создание проекта Maven

На этом этапе вы установите Apache Maven и создадите с его помощью проект, который будет использоваться для взаимодействия с Kafka. В Ubuntu Maven доступен в официальных репозиториях.

Сначала выведите список доступных пакетов, выполнив команду:

sudo apt update

Чтобы установить его, выполните следующую команду:

sudo apt install maven

Убедитесь, что он установлен, прочитав номер его версии:

mvn --version

Вывод будет похож на следующий, в зависимости от версии Java и платформы:

OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"

Затем создайте каталог, в котором вы будете хранить свои проекты Java для работы с Kafka:

mkdir ~/kafka-projects

Перейдите в только что созданный каталог:

cd ~/kafka-projects

Затем создайте пустой проект Java, выполнив следующее:

mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

Здесь вы указываете Maven создать новый проект под названием докафка С идентификатором группы com.dokafka Идентификатор группы однозначно идентифицирует этот проект в экосистеме. Maven Этот проект основан на архетипе maven-archetype-quickstart будут созданы, которые Maven будет называть шаблонами таким образом.

Вывод будет обширным, особенно если Maven запускается впервые. Конечный вывод будет выглядеть так:

Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------

Maven загружает необходимые пакеты Java из своего центрального репозитория и собирает проект. докафка Используя шаблон maven-archetype-quickstart создал.

Перейдите в каталог проекта, выполнив:

cd dokafka

Структура проекта следующая:

├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java

В рамках подготовки вы изучили стандартную структуру проекта Maven, которую вы видите здесь. Каталог src/main/java Содержит исходный код проекта, src/test/java Содержит экспериментальные ресурсы и pom.xml В корне проекта находится основной файл конфигурации Maven.

Этот проект содержит только один исходный файл, App.java Покажите его содержимое, чтобы увидеть, что создал Maven:

cat src/main/java/com/dokafka/App.java

Результат будет следующим:

package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}

Чтобы запустить этот код, необходимо сначала собрать проект, выполнив:

mvn package

Maven скомпилирует код и упакует его в JAR-файл для запуска. Результат будет выглядеть следующим образом, что означает, что всё готово:

Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------

Файл Maven БАНКА Полученный результат был помещен в целевой список. Для запуска класса Приложение который вы только что создали, выполните следующую команду и введите полный идентификатор класса:

java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App

Результат будет следующим:

OutputHello World!

Вы установили Maven и создали пустой проект Java. Теперь вам нужно добавить необходимые зависимости для Kafka.

Шаг 2 — Добавление зависимостей Maven

Теперь вам нужно добавить в свой проект клиент Kafka Java, а также другие зависимости для ведения журнала. Вы также можете настроить Maven для включения этих зависимостей во время упаковки. Сначала добавьте зависимости клиента Kafka. Перейдите на страницу репозитория Maven для клиента Java в браузере и выберите последнюю доступную версию, затем скопируйте предоставленный фрагмент XML для Maven. На момент написания статьи последняя версия клиентской библиотеки Java: 3.7.0 Это было.

Зависимости от pom.xml Они будут добавлены в корень вашего проекта. Откройте его для редактирования:

nano pom.xml

Раздел <dependencies&gt; Найдите ; и добавьте определение зависимости:

...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>

Это обеспечит ваш проект клиентской библиотекой Kafka. Однако для самой библиотеки требуются две другие зависимости, которые необходимо добавить вручную. Они относятся к библиотеке SLF4J, используемой для ведения журналов сообщений, поскольку она поддерживает множество библиотек ведения журналов и позволяет разработчику гибко подходить к обработке журналов. Необходимо добавить две зависимости:

  • slf4j-api, которая является самой библиотекой
  • slf4j-simple, который обрабатывает логи и выводит данные на терминал

После определения зависимостей их необходимо сделать доступными вместе с финальным JAR-файлом, который будет собран. Раздел &<сборка> pom.xml Найдите и добавьте выделенные строки:

...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...

Вот, плагин maven-dependency-plugin Вы настраиваете копирование всех зависимостей во время упаковки. JAR-файлы зависимостей в этой конфигурации проекта находятся в разделе цель/библиотека Обратите внимание, что не следует включать часть &.<плагины> Существующий в <pluginManagement&gt; Изменить. После завершения сохраните и закройте файл.

Соберите проект, чтобы убедиться, что все настроено правильно:

mvn package

Конечный результат должен выглядеть так:

Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------

Вы можете скачать файлы ниже. цель/библиотека Список, чтобы убедиться, что зависимости действительно скопированы:

Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar

Вы добавили необходимые зависимости в свой проект Maven. Теперь вам нужно подключиться к Kafka и отправлять сообщения по расписанию.

Шаг 3 — Создание производителя Kafka на Java

На этом этапе вы настраиваете производитель Kafka на Java и пишете сообщения в тему.

Исходный код проекта, согласно его структуре, представлен ниже. src/main/java/com/dokafka сохраняется. Поскольку он не понадобится для оставшейся части обучения App.java Если у вас его нет, удалите его, выполнив:

rm src/main/java/com/dokafka/App.java

Код производителя будет храниться в классе ProducerDemo. Создайте и откройте для редактирования соответствующий файл:

nano src/main/java/com/dokafka/ProducerDemo.java

Добавьте следующие строки:

package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}

Первый класс ProducerDemo Вы определяете, импортируете используемые классы и создаете Регистратор В методе main сначала объявляется адрес кластера Kafka (bootstrapServers) и имя темы для генерации сообщения (topicName).

Затем объект Характеристики Вы создаёте экземпляр , который похож на словарь "ключ-значение" и содержит конфигурацию вашей функции-производителя Kafka. Вы указываете свойство BOOTSTRAP_SERVERS_CONFIG Вы заполняете адрес кластера Kafka. Также записи KEY_SERIALIZER_CLASS_CONFIG и VALUE_SERIALIZER_CLASS_CONFIG на StringSerializer.class.getName() Набор.

Эти свойства определяют, какие сериализаторы следует использовать для обработки ключей и значений генерируемых сообщений. Сериализаторы — это классы, которые принимают входные данные и возвращают массив байтов в качестве выходных данных, готовый к передаче по сети. Десериализаторы Они делают наоборот и восстанавливают исходный объект из потока байтов. Здесь используются и ключ, и значение. StringSerializer Внутренние данные сериализуются в виде строк.

Далее у вас будет KafkaProducer Вы объявляете и создаете экземпляр:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Конструктор ключей и значений типа Нить Принимает следующие параметры конфигурации вместе с соответствующими свойствами. Чтобы отправить сообщение в тему, KafkaProducer Он принимает ProducerRecord, названный по теме и самому сообщению. Привет, мир! Вы делаете выборку. Обратите внимание, что сам создатель не привязан к конкретной теме.

После отправки сообщения, вы прошиваете и закрываете производителя. Вызов производитель.отправить() Он асинхронный, то есть управление возвращается к основному методу, пока сообщение отправляется в другом потоке. Поскольку после этого программа-пример должна завершиться, вы заставляете производителя отправить всё, что осталось, сбрасывая данные. Затем вы закрываете его методом close(), сигнализируя Kafka о том, что производитель уничтожается.

Далее вам нужно создать скрипт, который будет управлять сборкой и запуском ProducerDemo. Сохраните его в файле run-producer.sh. Создайте его и откройте для редактирования:

nano run-producer.sh

Добавьте следующие строки:

#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo

После завершения сохраните и закройте файл. Выделенная область указывает расположение зависимостей.

Затем отметьте его как исполняемый:

chmod +x run-producer.sh

Наконец, попробуйте сгенерировать сообщение Hello World!, запустив его:

./run-producer.sh

Вывод будет длинным и должен выглядеть примерно так:

Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

KafkaProducer был зарегистрирован, но впоследствии не смог зарегистрироваться. Теперь сообщение записано в тему java_demo, и его можно получить с помощью скрипта kafka-console-consumer.sh.

В отдельной оболочке перейдите в каталог установки Kafka и выполните следующую команду, чтобы прочитать тему:

bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092

Результат будет следующим:

OutputHello World!

Вы можете выйти. CTRL+C Нажимать .

На этом этапе программно отправляется сообщение в теме. java_demo Вы сгенерировали и прочитали его, используя bash-скрипт, предоставленный Kafka. Теперь вы узнаете, как использовать информацию, возвращаемую Kafka после успешной отправки сообщения.

Шаг 4 — Извлечение метаданных с помощью обратных вызовов

Метод send() KafkaProducer Он принимает обратные вызовы, позволяющие реагировать на происходящие события, например, при получении записи. Это полезно для получения информации о том, как кластер обрабатывает запись.

Чтобы расширить контакт отправлять() С обратным вызовом, сначала ProducerDemo Открыто для редактирования:

nano src/main/java/com/dokafka/ProducerDemo.java

Измените код на следующий:

...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...

Теперь добавьте к методу реализацию интерфейса Callback. отправлять() Вы переводите и onCompletion() Вы это реализуете RecordMetadata и по желанию Исключение получает. Затем, если возникает ошибка, запишите её в журнал. В противном случае вы регистрируете временную метку, номер раздела и смещение записи, которая в данный момент находится в кластере. Поскольку отправка сообщения таким образом асинхронна, ваш код вызывается, когда кластер принимает запись, без необходимости явного ожидания этого события.

Когда закончите, сохраните и закройте файл, а затем запустите конструктор:

./run-producer.sh

Обратите внимание на новое сообщение в конце вывода:

Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...

Вновь сгенерированное сообщение было принято кластером и сохранено в разделе 0.

Если вы запустите его снова, вы заметите, что смещение стало на единицу больше, что указывает на местоположение сообщения в разделе:

Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3

Результат

В этой статье вы создали проект Java с помощью Maven и снабдили его зависимостями для взаимодействия с Kafka. Затем вы создали класс, генерирующий сообщения для кластера Kafka, и расширили его для получения информации об отправленных записях.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Вам также может понравиться