Introducción
Apache Kafka proporciona scripts de shell para generar y recibir mensajes de texto básicos desde y hacia un clúster de Kafka. Si bien son útiles para la exploración y la experimentación, las aplicaciones reales acceden a Kafka mediante programación. Para ello, Kafka proporciona numerosas bibliotecas cliente para lenguajes y entornos de programación populares. En este tutorial, creará una aplicación Java que genera datos en un tema de Kafka. Creará un proyecto Java con Apache Maven, una herramienta para compilar y empaquetar proyectos Java, y añadirá la biblioteca cliente de Kafka como dependencia. A continuación, implementará una clase que utiliza el cliente de Kafka generando mensajes y recuperando metadatos sobre ellos dentro del clúster.
Requisitos previos
- Un dispositivo con al menos 4 GB de RAM y 2 CPU
- Java Development Kit (JDK) 8 o superior instalado en su Droplet o máquina local
- Apache Kafka instalado y configurado en su Droplet o máquina local
- Introducción al diseño de directorio estándar para proyectos Java
Paso 1: Crear un proyecto Maven
En este paso, instalará Apache Maven y lo usará para crear un proyecto que le permitirá comunicarse con Kafka. En Ubuntu, Maven está disponible en los repositorios oficiales.
Primero, enumera los paquetes disponibles ejecutando:
sudo apt updatePara instalarlo, ejecute el siguiente comando:
sudo apt install mavenVerifique que esté instalado leyendo su número de versión:
mvn --versionEl resultado será similar al siguiente, dependiendo de la versión y plataforma de Java:
OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"A continuación, crea un directorio donde almacenarás tus proyectos Java para trabajar con Kafka:
mkdir ~/kafka-projectsVaya al directorio recién creado:
cd ~/kafka-projectsLuego genere un proyecto Java vacío ejecutando lo siguiente:
mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=falseAquí, le indica a Maven que cree un nuevo proyecto llamado dokafka Con identificación de grupo com.dokafka El ID del grupo identifica de forma única este proyecto en todo el ecosistema. Experto Este proyecto se basa en el arquetipo Inicio rápido de arquetipos de Maven Se crearán, que Maven llamará las plantillas de esta manera.
Habrá mucha salida, especialmente si es la primera vez que se ejecuta Maven. El resultado final será similar a esto:
Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------Maven descarga los paquetes Java necesarios desde su repositorio central y compila el proyecto. dokafka Usando el patrón Inicio rápido de arquetipos de Maven ha creado.
Vaya al directorio del proyecto ejecutando:
cd dokafkaLa estructura del proyecto es la siguiente:
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.javaComo parte de los prerrequisitos, aprendiste sobre la estructura estándar del proyecto Maven, que puedes ver aquí. Directorio origen/principal/java Contiene el código fuente del proyecto, origen/prueba/java Contiene recursos experimentales y pom.xml En la raíz del proyecto se encuentra el archivo de configuración principal de Maven.
Este proyecto contiene solo un archivo fuente, Aplicación.java Muestra su contenido para ver lo que produjo Maven:
cat src/main/java/com/dokafka/App.javaLa salida será:
package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}Para ejecutar este código, primero debes compilar el proyecto ejecutando:
mvn packageMaven compilará el código y lo empaquetará en un archivo JAR para su ejecución. El resultado final será similar a esto, lo que significa que está completo:
Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------Archivo Maven FRASCO El resultado obtenido se colocó en la lista de objetivos. Para ejecutar la clase Aplicación que acaba de crear, ejecute el siguiente comando e ingrese el ID de clase completo:
java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.AppLa salida será:
OutputHello World!Has instalado Maven y creado un proyecto Java vacío. A continuación, agregarás las dependencias necesarias para Kafka.
Paso 2: Agregar dependencias de Maven
Ahora agregará el cliente Java de Kafka, así como otras dependencias para el registro, a su proyecto. También puede configurar Maven para incluir estas dependencias durante el empaquetado. Primero, agregará las dependencias del cliente de Kafka. Vaya a la página del repositorio de Maven para el cliente Java en su navegador y seleccione la última versión disponible. Luego, copie el fragmento XML proporcionado para Maven. Al momento de escribir este artículo, la última versión de la biblioteca del cliente Java es 3.7.0 Fue.
Dependencias a pom.xml Se añadirán a la raíz de tu proyecto. Ábrelo para editarlo:
nano pom.xmlSección <dependencies> Busque y agregue la definición de dependencia:
...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>Esto proporcionará la biblioteca cliente de Kafka a su proyecto. Sin embargo, la biblioteca requiere dos dependencias adicionales que debe agregar manualmente. Estas provienen de la biblioteca SLF4J, que se utiliza para registrar mensajes, ya que es compatible con muchas bibliotecas de registro y permite al desarrollador flexibilidad en el procesamiento de los mensajes de registro. Las dos dependencias que debe agregar son:
- slf4j-api que es la biblioteca en sí
- slf4j-simple, que procesa registros y envía salidas a la terminal
Una vez que haya definido las dependencias, debe ponerlas a disposición junto con el JAR final que se compila. Sección &<construir> pom.xml Busque y agregue las líneas resaltadas:
...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...Aquí, el complemento complemento de dependencia de Maven Configuras la copia de todas las dependencias al empaquetar. Los archivos JAR de dependencia, en esta configuración del proyecto, se encuentran en objetivo/lib Tenga en cuenta que no debe incluir la parte &.<complementos> El existente en <pluginManagement> Cambiar. Cuando haya terminado, guarde y cierre el archivo.
Construya el proyecto para asegurarse de que todo esté configurado correctamente:
mvn packageEl final de la salida debería verse así:
Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------Puedes descargar los archivos a continuación. objetivo/lib Lista para asegurarse de que las dependencias realmente se copien:
Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jarHas añadido las dependencias necesarias a tu proyecto Maven. Ahora quieres conectarte a Kafka y generar mensajes de forma programada.
Paso 3: Crear un productor de Kafka en Java
En este paso, configura un productor de Kafka en Java y escribe mensajes en un tema.
Según la estructura del proyecto, el código fuente se muestra a continuación. src/main/java/com/dokafka Se guarda. Dado que no es necesario para el resto del entrenamiento. Aplicación.java Si no lo tienes, elimínalo ejecutando:
rm src/main/java/com/dokafka/App.javaAlmacenarás el código del productor en una clase llamada ProducerDemo. Crea y abre el archivo adjunto para editarlo:
nano src/main/java/com/dokafka/ProducerDemo.javaAñade las siguientes líneas:
package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}Clase primera Demostración del productor Define, importa las clases utilizadas y crea una Registrador En el método principal, primero declara la dirección del clúster de Kafka (bootstrapServers) y el nombre del tema para generar el mensaje (topicName).
Entonces, un objeto Propiedades Se crea una instancia de un , que es similar a un diccionario clave-valor y contiene la configuración de la función de producción de Kafka. Se especifica la propiedad CONFIGURACIÓN DE SERVIDORES BOOTSTRAP Completa la dirección del clúster de Kafka. Además, las entradas CONFIGURACIÓN DE CLASE DEL SERIALIZADOR DE CLAVE y CONFIGURACIÓN DE CLASE DEL SERIALIZADOR DE VALOR en StringSerializer.class.getName() Colocar.
Estas propiedades especifican qué serializadores deben usarse para procesar las claves y los valores de los mensajes generados. Los serializadores son clases que aceptan datos de entrada y devuelven una matriz de bytes como salida, lista para ser transmitida por la red. Deserializadores Hacen lo contrario y reconstruyen el objeto original a partir del flujo de bytes. Aquí se utilizan tanto la clave como el valor. Serializador de cadenas Los elementos internos se serializan como cadenas.
A continuación, tendrás una Productor de Kafka Declaras e instancias:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);Constructor de claves y valores de tipo Cadena Acepta lo siguiente con las propiedades asociadas para la configuración. Para enviar un mensaje a un tema, Productor de Kafka Acepta un ProducerRecord que lleva el nombre del asunto y del mensaje en sí. ¡Hola Mundo! Estás probando. Ten en cuenta que el creador no está vinculado a ningún tema específico.
Después de enviar el mensaje, flashea y cierra el productor. Llama productor.send() Es asíncrono, lo que significa que el flujo de control regresa al método principal mientras el mensaje se envía en otro hilo. Dado que este programa de ejemplo desea salir después de eso, se fuerza al productor a enviar lo que quede mediante el vaciado. Luego, se ejecuta close(), lo que indica a Kafka que el productor se está destruyendo.
A continuación, creará un script que gestionará la compilación y ejecución de ProducerDemo. Lo guardará en un archivo llamado run-producer.sh. Créelo y ábralo para editarlo:
nano run-producer.shAñade las siguientes líneas:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemoCuando termines, guarda y cierra el archivo. El área resaltada indica dónde se encuentran las dependencias.
Luego márquelo como ejecutable:
chmod +x run-producer.shPor último, intenta generar un mensaje ¡Hola mundo! ejecutándolo:
./run-producer.shEl resultado será largo y debería quedar así:
Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregisteredSe registró KafkaProducer, que se creó correctamente y posteriormente no se registró. Ahora, el mensaje se escribe en el tema java_demo y puede recuperarlo mediante el script kafka-console-consumer.sh.
En un shell separado, vaya al directorio de instalación de Kafka y ejecute el siguiente comando para leer el tema:
bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092La salida será:
OutputHello World!Puedes salir. CTRL+C Prensa .
En esta etapa, programáticamente, envíe un mensaje en el asunto. demostración de java Lo has generado y leído usando el script bash proporcionado por Kafka. Ahora aprenderás a usar la información que Kafka devuelve tras el envío exitoso de un mensaje.
Paso 4: Recuperar metadatos mediante devoluciones de llamada
Método enviar() KafkaProducer Acepta devoluciones de llamada, lo que permite actuar sobre eventos que ocurren, como la recepción de un registro. Esto resulta útil para recuperar información sobre cómo el clúster gestiona el registro.
Para ampliar el contacto enviar() Con una devolución de llamada, primero Demostración del productor Abierto para edición:
nano src/main/java/com/dokafka/ProducerDemo.javaCambie el código por lo siguiente:
...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...Ahora agregue una implementación de la interfaz Callback al método. enviar() Usted transfiere y al completarse() Estás implementando eso Metadatos de registro y opcionalmente un Excepción Recibe. Si se produce un error, se registra. De lo contrario, se registra la marca de tiempo, el número de partición y el desplazamiento del registro actual en el clúster. Dado que el envío del mensaje de esta manera es asíncrono, se llama al código cuando el clúster acepta el registro, sin necesidad de esperar explícitamente a que ocurra.
Cuando haya terminado, guarde y cierre el archivo, luego ejecute el generador:
./run-producer.shObserve un nuevo mensaje al final de la salida:
Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...El mensaje recién generado fue aceptado por el clúster y almacenado en la partición 0.
Si lo ejecuta nuevamente, notará que el desplazamiento es uno más grande, lo que indica la ubicación del mensaje en la partición:
Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3Resultado
En este artículo, creaste un proyecto Java con Maven y lo equipaste con dependencias para comunicarse con Kafka. Luego, creaste una clase que genera mensajes para tu clúster de Kafka y la extendiste para recuperar la información de los registros enviados.









